Skip to content

Commit

Permalink
Add config to set the BigQuery Job timeout (GoogleCloudDataproc#1163)
Browse files Browse the repository at this point in the history
  • Loading branch information
isha97 authored Jan 17, 2024
1 parent 05a8240 commit d1dec79
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 7 deletions.
9 changes: 9 additions & 0 deletions README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,15 @@ word-break:break-word
</td>
<td>Read</td>
</tr>
<tr>
<td><code>bigQueryJobTimeoutInMinutes</code>
</td>
<td>Config to set the BigQuery job timeout in minutes.
Default value is <code>360</code> minutes.
<br/> (Optional)
</td>
<td>Read/Write</td>
</tr>

</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class BigQueryClient {
private final Optional<String> materializationDataset;
private final JobConfigurationFactory jobConfigurationFactory;
private final Optional<BigQueryJobCompletionListener> jobCompletionListener;
private final long bigQueryJobTimeoutInMinutes;

public BigQueryClient(
BigQuery bigQuery,
Expand All @@ -93,13 +94,15 @@ public BigQueryClient(
Cache<String, TableInfo> destinationTableCache,
Map<String, String> labels,
Priority queryJobPriority,
Optional<BigQueryJobCompletionListener> jobCompletionListener) {
Optional<BigQueryJobCompletionListener> jobCompletionListener,
long bigQueryJobTimeoutInMinutes) {
this.bigQuery = bigQuery;
this.materializationProject = materializationProject;
this.materializationDataset = materializationDataset;
this.destinationTableCache = destinationTableCache;
this.jobConfigurationFactory = new JobConfigurationFactory(labels, queryJobPriority);
this.jobCompletionListener = jobCompletionListener;
this.bigQueryJobTimeoutInMinutes = bigQueryJobTimeoutInMinutes;
}

public static synchronized void runCleanupJobs() {
Expand Down Expand Up @@ -127,10 +130,16 @@ public JobInfo waitForJob(Job job) {
Job completedJob =
job.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(1)),
RetryOption.totalTimeout(Duration.ofMinutes(3)));
if (completedJob == null && completedJob.getStatus().getError() != null) {
RetryOption.totalTimeout(Duration.ofMinutes(bigQueryJobTimeoutInMinutes)));
if (completedJob == null || completedJob.getStatus().getError() != null) {
throw new UncheckedIOException(
new IOException(completedJob.getStatus().getError().toString()));
new IOException(
completedJob != null ? completedJob.getStatus().getError().toString() : null));
}
if (!completedJob.isDone()) {
completedJob.cancel();
throw new IllegalStateException(
String.format("Job aborted due to timeout : %s minutes", bigQueryJobTimeoutInMinutes));
}
jobCompletionListener.ifPresent(jcl -> jcl.accept(completedJob));
return completedJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ public class BigQueryClientFactoryConfig implements BigQueryConfig {
private final int channelPoolSize;
private final Optional<Integer> flowControlWindowBytes;
private final QueryJobConfiguration.Priority queryJobPriority;
private final long bigQueryJobTimeoutInMinutes;

BigQueryClientFactoryConfig(BigQueryConfig bigQueryConfig) {
BigQueryClientFactoryConfig(BigQueryConfig bigQueryConfig, long bigQueryJobTimeoutInMinutes) {
this.accessTokenProviderFQCN = bigQueryConfig.getAccessTokenProviderFQCN();
this.accessTokenProviderConfig = bigQueryConfig.getAccessTokenProviderConfig();
this.credentialsKey = bigQueryConfig.getCredentialsKey();
Expand Down Expand Up @@ -85,6 +86,7 @@ public class BigQueryClientFactoryConfig implements BigQueryConfig {
this.channelPoolSize = bigQueryConfig.getChannelPoolSize();
this.flowControlWindowBytes = bigQueryConfig.getFlowControlWindowBytes();
this.queryJobPriority = bigQueryConfig.getQueryJobPriority();
this.bigQueryJobTimeoutInMinutes = bigQueryJobTimeoutInMinutes;
}

@Override
Expand Down Expand Up @@ -222,6 +224,10 @@ public Priority getQueryJobPriority() {
return queryJobPriority;
}

public long getBigQueryJobTimeoutInMinutes() {
return bigQueryJobTimeoutInMinutes;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public BigQueryClient provideBigQueryClient(
destinationTableCache,
bigQueryJobLabels,
config.getQueryJobPriority(),
Optional.of(jobCompletionListener));
Optional.of(jobCompletionListener),
config.getBigQueryJobTimeoutInMinutes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public interface BigQueryConfig {

Priority getQueryJobPriority();

long getBigQueryJobTimeoutInMinutes();

default int getClientCreationHashCode() {
return Objects.hashCode(
getAccessTokenProviderFQCN(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,11 @@ public Priority getQueryJobPriority() {
return Priority.INTERACTIVE;
}

@Override
public long getBigQueryJobTimeoutInMinutes() {
return 6 * 60;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public static WriteMethod from(@Nullable String writeMethod) {
"spark.sql.sources.partitionOverwriteMode";

public PartitionOverwriteMode partitionOverwriteModeValue = PartitionOverwriteMode.STATIC;
public static final String BIGQUERY_JOB_TIMEOUT_IN_MINUTES = "bigQueryJobTimeoutInMinutes";
static final long BIGQUERY_JOB_TIMEOUT_IN_MINUTES_DEFAULT = 6 * 60; // 6 hrs

TableId tableId;
// as the config needs to be Serializable, internally it uses
Expand Down Expand Up @@ -227,6 +229,7 @@ public static WriteMethod from(@Nullable String writeMethod) {
private com.google.common.base.Optional<String> destinationTableKmsKeyName = empty();

private boolean allowMapTypeConversion = ALLOW_MAP_TYPE_CONVERSION_DEFAULT;
private long bigQueryJobTimeoutInMinutes = BIGQUERY_JOB_TIMEOUT_IN_MINUTES_DEFAULT;

@VisibleForTesting
SparkBigQueryConfig() {
Expand Down Expand Up @@ -569,6 +572,11 @@ public static SparkBigQueryConfig from(
.transform(PartitionOverwriteMode::valueOf)
.or(PartitionOverwriteMode.STATIC);

config.bigQueryJobTimeoutInMinutes =
getAnyOption(globalOptions, options, BIGQUERY_JOB_TIMEOUT_IN_MINUTES)
.transform(Long::valueOf)
.or(BIGQUERY_JOB_TIMEOUT_IN_MINUTES_DEFAULT);

return config;
}

Expand Down Expand Up @@ -1009,6 +1017,10 @@ public boolean getAllowMapTypeConversion() {
return allowMapTypeConversion;
}

public long getBigQueryJobTimeoutInMinutes() {
return bigQueryJobTimeoutInMinutes;
}

public ImmutableMap<String, String> getBigQueryTableLabels() {
return bigQueryTableLabels;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public void testDefaults() {
assertThat(config.getQueryJobPriority()).isEqualTo(SparkBigQueryConfig.DEFAULT_JOB_PRIORITY);
assertThat(config.getKmsKeyName()).isEqualTo(Optional.empty());
assertThat(config.getAllowMapTypeConversion()).isTrue();
assertThat(config.getBigQueryJobTimeoutInMinutes()).isEqualTo(6 * 60);
}

@Test
Expand Down Expand Up @@ -176,6 +177,7 @@ public void testConfigFromOptions() {
.put("queryJobPriority", "batch")
.put("destinationTableKmsKeyName", "some/key/name")
.put("allowMapTypeConversion", "false")
.put("bigQueryJobTimeoutInMinutes", "30")
.build());
SparkBigQueryConfig config =
SparkBigQueryConfig.from(
Expand Down Expand Up @@ -229,6 +231,7 @@ public void testConfigFromOptions() {
assertThat(config.getQueryJobPriority()).isEqualTo(Priority.valueOf("BATCH"));
assertThat(config.getKmsKeyName()).isEqualTo(Optional.of("some/key/name"));
assertThat(config.getAllowMapTypeConversion()).isFalse();
assertThat(config.getBigQueryJobTimeoutInMinutes()).isEqualTo(30);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ private static BigQueryClient getBigQueryClient() {
destinationTableCache,
ImmutableMap.of(),
SparkBigQueryConfig.DEFAULT_JOB_PRIORITY,
Optional.empty());
Optional.empty(),
6 * 60);
}

public static void createDataset(String dataset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,27 @@ public void testQueryJobPriority() {

validateResult(df);
}

@Test
public void testReadFromLongQueryWithBigQueryJobTimeout() {
String query = "SELECT * FROM `largesamples.wikipedia_pageviews_201001`";
assertThrows(
RuntimeException.class,
() -> {
try {
spark
.read()
.format("bigquery")
.option("viewsEnabled", true)
.option("materializationDataset", testDataset.toString())
.option("bigQueryJobTimeoutInMinutes", "1")
.load(query)
.show();
} catch (Exception e) {
throw e;
}
});
}
}

class TestBigQueryJobCompletionListener extends SparkListener {
Expand Down

0 comments on commit d1dec79

Please sign in to comment.