Skip to content

Commit

Permalink
Preventing early materialization of query on load (GoogleCloudDatapro…
Browse files Browse the repository at this point in the history
  • Loading branch information
isha97 authored Jan 8, 2024
1 parent 919c3c7 commit 1c484eb
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 39 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Next

* PR #1155: allow lazy materialization of query on load

## 0.35.1 - 2023-12-28

* PR #1153: allow writing spark string to BQ datetime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.bigquery.JobConfiguration;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryJobConfiguration.Priority;
Expand Down Expand Up @@ -416,6 +417,25 @@ public TableInfo getReadTable(ReadTableOptions options) {
tableType, table.getTableId().getDataset(), table.getTableId().getTable()));
}

/**
* Returns the schema of the table/query/view. Uses dryRun to get the query schema instead of
* materializing it.
*
* @param options The {@code ReadTableOptions} options for reading the data source.
* @return The schema.
*/
public Schema getReadTableSchema(ReadTableOptions options) {
Optional<String> query = options.query();
// lazy materialization if it's a query
if (query.isPresent()) {
validateViewsEnabled(options);
String sql = query.get();
return getQueryResultSchema(sql, Collections.emptyMap());
}
TableInfo table = getReadTable(options);
return table != null ? table.getDefinition().getSchema() : null;
}

private void validateViewsEnabled(ReadTableOptions options) {
if (!options.viewsEnabled()) {
throw new BigQueryConnectorException(
Expand All @@ -439,7 +459,7 @@ Iterable<Dataset> listDatasets(String projectId) {
return bigQuery.listDatasets(projectId).iterateAll();
}

Iterable<Table> listTables(DatasetId datasetId, TableDefinition.Type... types) {
public Iterable<Table> listTables(DatasetId datasetId, TableDefinition.Type... types) {
Set<TableDefinition.Type> allowedTypes = ImmutableSet.copyOf(types);
Iterable<Table> allTables = bigQuery.listTables(datasetId).iterateAll();
return StreamSupport.stream(allTables.spliterator(), false)
Expand Down Expand Up @@ -630,6 +650,24 @@ public TableInfo materializeViewToTable(
return materializeTable(querySql, tableId, expirationTimeInMinutes);
}

public Schema getQueryResultSchema(
String querySql, Map<String, String> additionalQueryJobLabels) {
JobInfo jobInfo =
JobInfo.of(
jobConfigurationFactory
.createQueryJobConfigurationBuilder(querySql, additionalQueryJobLabels)
.setDryRun(true)
.build());

log.info("running query dryRun {}", querySql);
JobInfo completedJobInfo = create(jobInfo);
if (completedJobInfo.getStatus().getError() != null) {
throw BigQueryUtil.convertToBigQueryException(completedJobInfo.getStatus().getError());
}
JobStatistics.QueryStatistics queryStatistics = completedJobInfo.getStatistics();
return queryStatistics.getSchema();
}

private TableInfo materializeTable(
String querySql, TableId destinationTableId, int expirationTimeInMinutes) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.ExternalTableDefinition;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.ViewDefinition;
Expand Down Expand Up @@ -56,6 +58,17 @@ public static BigQuery getBigquery() {
return BigQueryOptions.getDefaultInstance().getService();
}

private static BigQueryClient getBigQueryClient() {
return new BigQueryClient(
getBigquery(),
Optional.empty(),
Optional.empty(),
destinationTableCache,
ImmutableMap.of(),
SparkBigQueryConfig.DEFAULT_JOB_PRIORITY,
Optional.empty());
}

public static void createDataset(String dataset) {
BigQuery bq = getBigquery();
DatasetId datasetId = DatasetId.of(dataset);
Expand All @@ -64,16 +77,11 @@ public static void createDataset(String dataset) {
}

public static void runQuery(String query) {
BigQueryClient bigQueryClient =
new BigQueryClient(
getBigquery(),
Optional.empty(),
Optional.empty(),
destinationTableCache,
ImmutableMap.of(),
SparkBigQueryConfig.DEFAULT_JOB_PRIORITY,
Optional.empty());
bigQueryClient.query(query);
getBigQueryClient().query(query);
}

public static Iterable<Table> listTables(DatasetId datasetId, TableDefinition.Type... types) {
return getBigQueryClient().listTables(datasetId, types);
}

public static void createBigLakeTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,24 @@

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.spark.bigquery.events.BigQueryJobCompletedEvent;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -40,6 +46,8 @@ class ReadFromQueryIntegrationTestBase extends SparkBigQueryIntegrationTestBase

private BigQuery bq;

private final boolean isDsv2OnSpark3AndAbove;

private TestBigQueryJobCompletionListener listener = new TestBigQueryJobCompletionListener();

@Before
Expand All @@ -54,8 +62,13 @@ public void removeListener() {
}

protected ReadFromQueryIntegrationTestBase() {
this(false);
}

protected ReadFromQueryIntegrationTestBase(boolean isDsv2OnSpark3AndAbove) {
super();
this.bq = BigQueryOptions.getDefaultInstance().getService();
this.isDsv2OnSpark3AndAbove = isDsv2OnSpark3AndAbove;
}

private void testReadFromQueryInternal(String query) {
Expand Down Expand Up @@ -118,6 +131,26 @@ public void testQueryOption() {
.option("query", query)
.load();

StructType expectedSchema =
DataTypes.createStructType(
ImmutableList.of(
DataTypes.createStructField("corpus", DataTypes.StringType, true),
DataTypes.createStructField("word_count", DataTypes.LongType, true)));

assertThat(df.schema()).isEqualTo(expectedSchema);

if (isDsv2OnSpark3AndAbove) {
Iterable<Table> tablesInDataset =
IntegrationTestUtils.listTables(
DatasetId.of(testDataset.toString()),
TableDefinition.Type.TABLE,
TableDefinition.Type.MATERIALIZED_VIEW);
assertThat(
StreamSupport.stream(tablesInDataset.spliterator(), false)
.noneMatch(table -> table.getTableId().getTable().startsWith("_bqc_")))
.isTrue();
}

validateResult(df);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
*/
package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.bigquery.TableId;
import com.google.inject.Injector;
import java.util.function.Supplier;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.types.StructType;

@FunctionalInterface
public interface BigQueryTableCreator {

Table create(Injector injector, TableId tableId, StructType schema);
Table create(Injector injector, Supplier<StructType> schema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.bigquery.TableId;
import com.google.cloud.spark.bigquery.DataSourceVersion;
import com.google.cloud.spark.bigquery.SparkBigQueryConfig;
import com.google.cloud.spark.bigquery.v2.context.BigQueryDataSourceReaderContext;
Expand All @@ -25,6 +24,7 @@
import com.google.inject.Injector;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.SupportsRead;
Expand All @@ -44,13 +44,11 @@ public class Spark31BigQueryTable implements Table, SupportsRead, SupportsWrite
TableCapability.BATCH_READ, TableCapability.V1_BATCH_WRITE, TableCapability.TRUNCATE);

protected Injector injector;
protected TableId tableId;
protected StructType schema;
protected Supplier<StructType> schemaSupplier;

public Spark31BigQueryTable(Injector injector, TableId tableId, StructType schema) {
public Spark31BigQueryTable(Injector injector, Supplier<StructType> schemaSupplier) {
this.injector = injector;
this.tableId = tableId;
this.schema = schema;
this.schemaSupplier = schemaSupplier;
}

@Override
Expand All @@ -67,7 +65,7 @@ protected BigQueryDataSourceReaderContext createBigQueryDataSourceReaderContext(
ImmutableMap.of(),
injector.getInstance(DataSourceVersion.class),
injector.getInstance(SparkSession.class),
Optional.of(schema), /*tableIsMandatory*/
Optional.ofNullable(schemaSupplier.get()), /*tableIsMandatory*/
true);
Injector readerInjector =
injector.createChildInjector(
Expand All @@ -84,7 +82,7 @@ public String name() {

@Override
public StructType schema() {
return this.schema;
return this.schemaSupplier.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.connector.common.BigQueryClient;
import com.google.cloud.bigquery.connector.common.UserAgentProvider;
import com.google.cloud.spark.bigquery.DataSourceVersion;
Expand All @@ -26,6 +26,7 @@
import com.google.cloud.spark.bigquery.metrics.SparkBigQueryConnectorMetricsUtils;
import com.google.inject.Injector;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.catalog.Table;
Expand All @@ -46,20 +47,22 @@ static Table createBigQueryTableInstance(
.build();
BigQueryClient bigQueryClient = injector.getInstance(BigQueryClient.class);
SparkBigQueryConfig config = injector.getInstance(SparkBigQueryConfig.class);
TableInfo tableInfo = bigQueryClient.getReadTable(config.toReadTableOptions());
SparkContext sparkContext = injector.getInstance(SparkSession.class).sparkContext();
SparkBigQueryConnectorMetricsUtils.postInputFormatEvent(sparkContext);
UserAgentProvider userAgentProvider = injector.getInstance(UserAgentProvider.class);
SparkBigQueryConnectorMetricsUtils.postConnectorVersion(
sparkContext, userAgentProvider.getConnectorInfo());
if (tableInfo == null) {
return bigQueryTableCreator.create(injector, config.getTableId(), sparkProvidedSchema);
}
StructType schema =
sparkProvidedSchema != null
? sparkProvidedSchema
: SchemaConverters.from(SchemaConvertersConfiguration.from(config))
.toSpark(tableInfo.getDefinition().getSchema());
return bigQueryTableCreator.create(injector, tableInfo.getTableId(), schema);
Supplier<StructType> schemaSupplier =
() -> {
if (sparkProvidedSchema != null) {
return sparkProvidedSchema;
}
Schema schemaFromTable = bigQueryClient.getReadTableSchema(config.toReadTableOptions());
return schemaFromTable != null
? SchemaConverters.from(SchemaConvertersConfiguration.from(config))
.toSpark(schemaFromTable)
: null;
};
return bigQueryTableCreator.create(injector, schemaSupplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package com.google.cloud.spark.bigquery.integration;

public class Spark31ReadFromQueryIntegrationTest extends ReadFromQueryIntegrationTestBase {

public Spark31ReadFromQueryIntegrationTest() {
super(true);
}
// tests are from the super-class

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
*/
package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.bigquery.TableId;
import com.google.cloud.spark.bigquery.v2.context.BigQueryDataSourceReaderContext;
import com.google.inject.Injector;
import java.util.function.Supplier;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class Spark32BigQueryTable extends Spark31BigQueryTable {

protected Spark32BigQueryTable(Injector injector, TableId tableId, StructType schema) {
super(injector, tableId, schema);
protected Spark32BigQueryTable(Injector injector, Supplier<StructType> schemaSupplier) {
super(injector, schemaSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

public class Spark32ReadFromQueryIntegrationTest extends ReadFromQueryIntegrationTestBase {

public Spark32ReadFromQueryIntegrationTest() {
super(true);
}

// tests are from the super-class

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
*/
package com.google.cloud.spark.bigquery.v2;

import com.google.cloud.bigquery.TableId;
import com.google.cloud.spark.bigquery.v2.context.BigQueryDataSourceReaderContext;
import com.google.inject.Injector;
import java.util.function.Supplier;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

public class Spark33BigQueryTable extends Spark32BigQueryTable {

protected Spark33BigQueryTable(Injector injector, TableId tableId, StructType schema) {
super(injector, tableId, schema);
protected Spark33BigQueryTable(Injector injector, Supplier<StructType> schemaSupplier) {
super(injector, schemaSupplier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

public class Spark33ReadFromQueryIntegrationTest extends ReadFromQueryIntegrationTestBase {

public Spark33ReadFromQueryIntegrationTest() {
super(true);
}

// tests are from the super-class

}
Loading

0 comments on commit 1c484eb

Please sign in to comment.