From ff02b3fd96800c806d6f1c3ee1621d8e6d9bb072 Mon Sep 17 00:00:00 2001 From: PhongChuong Date: Tue, 23 Jul 2024 09:50:55 -0400 Subject: [PATCH 1/9] feat: initial implementation of customizable BigQueryRetryConfig --- .../com/google/cloud/bigquery/BigQuery.java | 12 +- .../google/cloud/bigquery/BigQueryImpl.java | 21 ++- .../java/com/google/cloud/bigquery/Job.java | 39 ++++-- .../cloud/bigquery/spi/v2/BigQueryRpc.java | 4 +- .../cloud/bigquery/BigQueryImplTest.java | 3 + .../com/google/cloud/bigquery/JobTest.java | 127 ++++++++++++++++++ .../cloud/bigquery/it/ITBigQueryTest.java | 12 +- 7 files changed, 199 insertions(+), 19 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java index 80fd6618d..e391c054d 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java @@ -561,7 +561,7 @@ public static JobListOption fields(JobField... fields) { /** Class for specifying table get and create options. */ class JobOption extends Option { - private static final long serialVersionUID = -3111736712316353665L; + private static final long serialVersionUID = -3111736712316353664L; private JobOption(BigQueryRpc.Option option, Object value) { super(option, value); @@ -578,6 +578,16 @@ public static JobOption fields(JobField... fields) { return new JobOption( BigQueryRpc.Option.FIELDS, Helper.selector(JobField.REQUIRED_FIELDS, fields)); } + + /** Returns an option to specify the job's BigQuery retry configuration. */ + public static JobOption bigQueryRetryConfig(BigQueryRetryConfig bigQueryRetryConfig) { + return new JobOption(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, bigQueryRetryConfig); + } + + /** Returns an option to specify the job's retry options. */ + public static JobOption retryOptions(RetryOption... options) { + return new JobOption(BigQueryRpc.Option.RETRY_OPTIONS, options); + } } /** Class for specifying query results options. */ diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index acfa1b7f1..5e86bb6d3 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -39,6 +39,7 @@ import com.google.cloud.Policy; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; +import com.google.cloud.RetryOption; import com.google.cloud.Tuple; import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; import com.google.cloud.bigquery.QueryJobConfiguration.JobCreationMode; @@ -415,10 +416,15 @@ public com.google.api.services.bigquery.model.Job call() { } } }, - getOptions().getRetrySettings(), + getRetryOptions(optionsMap) != null + ? RetryOption.mergeToSettings( + getOptions().getRetrySettings(), getRetryOptions(optionsMap)) + : getOptions().getRetrySettings(), BigQueryBaseService.BIGQUERY_EXCEPTION_HANDLER, getOptions().getClock(), - DEFAULT_RETRY_CONFIG)); + getBigQueryRetryConfig(optionsMap) != null + ? getBigQueryRetryConfig(optionsMap) + : DEFAULT_RETRY_CONFIG)); } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) { throw BigQueryException.translateAndThrow(e); } @@ -1628,4 +1634,15 @@ public com.google.api.services.bigquery.model.TestIamPermissionsResponse call() } return optionMap; } + + @VisibleForTesting + static BigQueryRetryConfig getBigQueryRetryConfig(Map options) { + return (BigQueryRetryConfig) + options.getOrDefault(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, null); + } + + @VisibleForTesting + static RetryOption[] getRetryOptions(Map options) { + return (RetryOption[]) options.getOrDefault(BigQueryRpc.Option.RETRY_OPTIONS, null); + } } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java index d23e4ea52..4a1189374 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java @@ -28,12 +28,14 @@ import com.google.cloud.bigquery.BigQuery.QueryResultsOption; import com.google.cloud.bigquery.BigQuery.TableDataListOption; import com.google.cloud.bigquery.JobConfiguration.Type; +import com.google.cloud.bigquery.spi.v2.BigQueryRpc; import com.google.common.collect.ImmutableList; import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -196,6 +198,7 @@ public boolean isDone() { Job job = bigquery.getJob(getJobId(), JobOption.fields(BigQuery.JobField.STATUS)); return job == null || JobStatus.State.DONE.equals(job.getStatus().getState()); } + /** * Blocks until this job completes its execution, either failing or succeeding. This method * returns current job's latest information. If the job no longer exists, this method returns @@ -238,12 +241,20 @@ public boolean isDone() { * to complete */ public Job waitFor(RetryOption... waitOptions) throws InterruptedException { + return waitForInternal(DEFAULT_RETRY_CONFIG, waitOptions); + } + public Job waitFor(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions) throws InterruptedException { + return waitForInternal(bigQueryRetryConfig, waitOptions); + } + + private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions) throws InterruptedException { checkNotDryRun("waitFor"); Object completedJobResponse; if (getConfiguration().getType() == Type.QUERY) { completedJobResponse = waitForQueryResults( RetryOption.mergeToSettings(DEFAULT_JOB_WAIT_SETTINGS, waitOptions), + bigQueryRetryConfig, DEFAULT_QUERY_WAIT_OPTIONS); } else { completedJobResponse = @@ -253,17 +264,17 @@ public Job waitFor(RetryOption... waitOptions) throws InterruptedException { return completedJobResponse == null ? null : reload(); } - /** - * Gets the query results of this job. This job must be of type {@code - * JobConfiguration.Type.QUERY}, otherwise this method will throw {@link - * UnsupportedOperationException}. - * - *

If the job hasn't finished, this method waits for the job to complete. However, the state of - * the current {@code Job} instance is not updated. To get the new state, call {@link - * #waitFor(RetryOption...)} or {@link #reload(JobOption...)}. - * - * @throws BigQueryException upon failure - */ + /** + * Gets the query results of this job. This job must be of type {@code + * JobConfiguration.Type.QUERY}, otherwise this method will throw {@link + * UnsupportedOperationException}. + * + *

If the job hasn't finished, this method waits for the job to complete. However, the state of + * the current {@code Job} instance is not updated. To get the new state, call {@link + * #waitFor(RetryOption...)} or {@link #reload(JobOption...)}. + * + * @throws BigQueryException upon failure + */ public TableResult getQueryResults(QueryResultsOption... options) throws InterruptedException, JobException { checkNotDryRun("getQueryResults"); @@ -294,7 +305,7 @@ public TableResult getQueryResults(QueryResultsOption... options) QueryResponse response = waitForQueryResults( - DEFAULT_JOB_WAIT_SETTINGS, waitOptions.toArray(new QueryResultsOption[0])); + DEFAULT_JOB_WAIT_SETTINGS, DEFAULT_RETRY_CONFIG, waitOptions.toArray(new QueryResultsOption[0])); // Get the job resource to determine if it has errored. Job job = this; @@ -334,7 +345,7 @@ public TableResult getQueryResults(QueryResultsOption... options) } private QueryResponse waitForQueryResults( - RetrySettings retrySettings, final QueryResultsOption... resultsOptions) + RetrySettings retrySettings, BigQueryRetryConfig bigQueryRetryConfig, final QueryResultsOption... resultsOptions) throws InterruptedException { if (getConfiguration().getType() != Type.QUERY) { throw new UnsupportedOperationException( @@ -360,7 +371,7 @@ public boolean shouldRetry( } }, options.getClock(), - DEFAULT_RETRY_CONFIG); + bigQueryRetryConfig); } catch (BigQueryRetryHelper.BigQueryRetryHelperException e) { throw BigQueryException.translateAndThrow(e); } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java index 341640919..57f1a05c0 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/spi/v2/BigQueryRpc.java @@ -57,7 +57,9 @@ enum Option { STATE_FILTER("stateFilter"), TIMEOUT("timeoutMs"), REQUESTED_POLICY_VERSION("requestedPolicyVersion"), - TABLE_METADATA_VIEW("view"); + TABLE_METADATA_VIEW("view"), + RETRY_OPTIONS("retryOptions"), + BIGQUERY_RETRY_CONFIG("bigQueryRetryConfig"); private final String value; diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index d3d374006..fc141828f 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -52,6 +52,7 @@ @RunWith(MockitoJUnitRunner.class) public class BigQueryImplTest { + // TODO(NOW) private static final String PROJECT = "project"; private static final String LOCATION = "US"; @@ -2298,6 +2299,8 @@ public void testGetQueryResults() { verify(bigqueryRpcMock).getQueryResults(PROJECT, JOB, null, EMPTY_RPC_OPTIONS); } + // TODO(NOW) + @Test public void testGetQueryResultsRetry() { JobId queryJob = JobId.of(JOB); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java index d10203444..f537461e5 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java @@ -37,10 +37,14 @@ import com.google.api.gax.paging.Page; import com.google.api.gax.paging.Pages; import com.google.cloud.RetryOption; +import com.google.cloud.bigquery.BigQuery.QueryResultsOption; import com.google.cloud.bigquery.JobStatistics.CopyStatistics; import com.google.cloud.bigquery.JobStatistics.QueryStatistics; import com.google.cloud.bigquery.JobStatus.State; import com.google.common.collect.ImmutableList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -85,6 +89,8 @@ public class JobTest { RetryOption.retryDelayMultiplier(1.0) }; + private static final BigQueryRetryConfig TEST_BIGQUERY_RETRY_CONFIG = BigQueryRetryConfig.newBuilder().retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG).build(); + @Rule public MockitoRule rule; private BigQuery bigquery; @@ -406,6 +412,127 @@ public void testWaitForWithTimeout() throws InterruptedException { } } + @Test + public void testWaitForBigQueryRetryConfig() throws InterruptedException { + QueryJobConfiguration jobConfig = + QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build(); + QueryStatistics jobStatistics = + QueryStatistics.newBuilder() + .setCreationTimestamp(1L) + .setEndTime(3L) + .setStartTime(2L) + .build(); + JobInfo jobInfo = + JobInfo.newBuilder(jobConfig) + .setJobId(JOB_ID) + .setStatistics(jobStatistics) + .setJobId(JOB_ID) + .setEtag(ETAG) + .setGeneratedId(GENERATED_ID) + .setSelfLink(SELF_LINK) + .setUserEmail(EMAIL) + .setStatus(JOB_STATUS) + .build(); + + when(bigquery.getOptions()).thenReturn(mockOptions); + when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock()); + Job completedJob = + expectedJob.toBuilder().setStatus(new JobStatus(JobStatus.State.RUNNING)).build(); + Page singlePage = Pages.empty(); + TableResult result = + TableResult.newBuilder() + .setSchema(Schema.of()) + .setTotalRows(1L) + .setPageNoSchema(singlePage) + .build(); + QueryResponse completedQuery = + QueryResponse.newBuilder() + .setCompleted(true) + .setTotalRows(1) // Lies to force call of listTableData(). + .setSchema(Schema.of(Field.of("_f0", LegacySQLTypeName.INTEGER))) + .setErrors(ImmutableList.of()) + .build(); + + // when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) + // .thenReturn(completedQuery); + when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); + when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) + .thenReturn(completedQuery); + // when(bigquery.listTableData(eq(TABLE_ID1), any(Schema.class))).thenReturn(result); + job = this.job.toBuilder().setConfiguration(jobConfig).build(); + assertThat(job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); + // assertThat(job.getQueryResults().iterateAll()).hasSize(0); + verify(bigquery, times(1)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); + verify(bigquery).getJob(JOB_INFO.getJobId()); + } + + @Test + public void testWaitForBigQueryRetryConfigRetryError() throws InterruptedException { + // TODO(NOW) + + QueryJobConfiguration jobConfig = + QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build(); + QueryStatistics jobStatistics = + QueryStatistics.newBuilder() + .setCreationTimestamp(1L) + .setEndTime(3L) + .setStartTime(2L) + .build(); + JobInfo jobInfo = + JobInfo.newBuilder(jobConfig) + .setJobId(JOB_ID) + .setStatistics(jobStatistics) + .setJobId(JOB_ID) + .setEtag(ETAG) + .setGeneratedId(GENERATED_ID) + .setSelfLink(SELF_LINK) + .setUserEmail(EMAIL) + .setStatus(JOB_STATUS) + .build(); + + when(bigquery.getOptions()).thenReturn(mockOptions); + when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock()); + Job completedJob = + expectedJob.toBuilder().setStatus(new JobStatus(JobStatus.State.RUNNING)).build(); + Page singlePage = Pages.empty(); + TableResult result = + TableResult.newBuilder() + .setSchema(Schema.of()) + .setTotalRows(1L) + .setPageNoSchema(singlePage) + .build(); + QueryResponse completedQuery = + QueryResponse.newBuilder() + .setCompleted(true) + .setTotalRows(1) // Lies to force call of listTableData(). + .setSchema(Schema.of(Field.of("_f0", LegacySQLTypeName.INTEGER))) + .setErrors(ImmutableList.of()) + .build(); + + + // when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) + // .thenReturn(completedQuery); + // Setup the error message. TODO + when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); + // BigQueryError bigQueryError = new BigQueryError("invalidQuery", "US", "invalidQuery"); + BigQueryError bigQueryError = new BigQueryError("invalidQuery", "US", "Exceeded rate limits:"); + + ImmutableList bigQueryErrorList = ImmutableList.of(bigQueryError); + BigQueryException bigQueryException = new BigQueryException(bigQueryErrorList); + when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) + // .thenReturn(completedQuery) + .thenThrow(bigQueryException); + when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) + .thenReturn(completedQuery); + // .thenThrow(bigQueryException); + // when(bigquery.listTableData(eq(TABLE_ID1), any(Schema.class))).thenReturn(result); + job = this.job.toBuilder().setConfiguration(jobConfig).build(); + assertThat(job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); + // assertThat(job.getQueryResults().iterateAll()).hasSize(0); + verify(bigquery, times(1)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); + verify(bigquery).getJob(JOB_INFO.getJobId()); + } + @Test public void testReload() { JobInfo updatedInfo = JOB_INFO.toBuilder().setEtag("etag").build(); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index cab716ff6..04da8a0e9 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -54,9 +54,11 @@ import com.google.cloud.bigquery.BigQuery.TableOption; import com.google.cloud.bigquery.BigQueryDryRunResult; import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.BigQueryErrorMessages; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.BigQueryResult; +import com.google.cloud.bigquery.BigQueryRetryConfig; import com.google.cloud.bigquery.BigQuerySQLException; import com.google.cloud.bigquery.CloneDefinition; import com.google.cloud.bigquery.Clustering; @@ -3225,13 +3227,21 @@ public void testQuery() throws InterruptedException { @Test public void testQueryStatistics() throws InterruptedException { // Use CURRENT_TIMESTAMP to avoid potential caching. + // TODO(NOW) String query = "SELECT CURRENT_TIMESTAMP() AS ts"; QueryJobConfiguration config = QueryJobConfiguration.newBuilder(query) .setDefaultDataset(DatasetId.of(DATASET)) .setUseQueryCache(false) .build(); - Job job = bigquery.create(JobInfo.of(JobId.of(), config)); + + BigQueryRetryConfig retryConfig = + BigQueryRetryConfig.newBuilder() + .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) + .build(); // retry config with Error Message for RateLimitExceeded Error + Job job = + bigquery.create( + JobInfo.of(JobId.of(), config), BigQuery.JobOption.bigQueryRetryConfig(retryConfig)); job = job.waitFor(); JobStatistics.QueryStatistics statistics = job.getStatistics(); From 5b75c38488fe62a589b3183076b35ecbac037194 Mon Sep 17 00:00:00 2001 From: PhongChuong Date: Tue, 23 Jul 2024 15:12:19 -0400 Subject: [PATCH 2/9] Add unit and integration tests --- .../google/cloud/bigquery/BigQueryImpl.java | 2 - .../java/com/google/cloud/bigquery/Job.java | 39 ++--- .../cloud/bigquery/BigQueryImplTest.java | 114 +++++++++++++- .../com/google/cloud/bigquery/JobTest.java | 141 ++++++++++-------- .../cloud/bigquery/it/ITBigQueryTest.java | 20 ++- 5 files changed, 231 insertions(+), 85 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java index 5e86bb6d3..576083215 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java @@ -1635,13 +1635,11 @@ public com.google.api.services.bigquery.model.TestIamPermissionsResponse call() return optionMap; } - @VisibleForTesting static BigQueryRetryConfig getBigQueryRetryConfig(Map options) { return (BigQueryRetryConfig) options.getOrDefault(BigQueryRpc.Option.BIGQUERY_RETRY_CONFIG, null); } - @VisibleForTesting static RetryOption[] getRetryOptions(Map options) { return (RetryOption[]) options.getOrDefault(BigQueryRpc.Option.RETRY_OPTIONS, null); } diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java index 4a1189374..700c64604 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java @@ -28,14 +28,12 @@ import com.google.cloud.bigquery.BigQuery.QueryResultsOption; import com.google.cloud.bigquery.BigQuery.TableDataListOption; import com.google.cloud.bigquery.JobConfiguration.Type; -import com.google.cloud.bigquery.spi.v2.BigQueryRpc; import com.google.common.collect.ImmutableList; import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -243,11 +241,14 @@ public boolean isDone() { public Job waitFor(RetryOption... waitOptions) throws InterruptedException { return waitForInternal(DEFAULT_RETRY_CONFIG, waitOptions); } - public Job waitFor(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions) throws InterruptedException { + + public Job waitFor(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions) + throws InterruptedException { return waitForInternal(bigQueryRetryConfig, waitOptions); } - private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions) throws InterruptedException { + private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions) + throws InterruptedException { checkNotDryRun("waitFor"); Object completedJobResponse; if (getConfiguration().getType() == Type.QUERY) { @@ -264,17 +265,17 @@ private Job waitForInternal(BigQueryRetryConfig bigQueryRetryConfig, RetryOption return completedJobResponse == null ? null : reload(); } - /** - * Gets the query results of this job. This job must be of type {@code - * JobConfiguration.Type.QUERY}, otherwise this method will throw {@link - * UnsupportedOperationException}. - * - *

If the job hasn't finished, this method waits for the job to complete. However, the state of - * the current {@code Job} instance is not updated. To get the new state, call {@link - * #waitFor(RetryOption...)} or {@link #reload(JobOption...)}. - * - * @throws BigQueryException upon failure - */ + /** + * Gets the query results of this job. This job must be of type {@code + * JobConfiguration.Type.QUERY}, otherwise this method will throw {@link + * UnsupportedOperationException}. + * + *

If the job hasn't finished, this method waits for the job to complete. However, the state of + * the current {@code Job} instance is not updated. To get the new state, call {@link + * #waitFor(RetryOption...)} or {@link #reload(JobOption...)}. + * + * @throws BigQueryException upon failure + */ public TableResult getQueryResults(QueryResultsOption... options) throws InterruptedException, JobException { checkNotDryRun("getQueryResults"); @@ -305,7 +306,9 @@ public TableResult getQueryResults(QueryResultsOption... options) QueryResponse response = waitForQueryResults( - DEFAULT_JOB_WAIT_SETTINGS, DEFAULT_RETRY_CONFIG, waitOptions.toArray(new QueryResultsOption[0])); + DEFAULT_JOB_WAIT_SETTINGS, + DEFAULT_RETRY_CONFIG, + waitOptions.toArray(new QueryResultsOption[0])); // Get the job resource to determine if it has errored. Job job = this; @@ -345,7 +348,9 @@ public TableResult getQueryResults(QueryResultsOption... options) } private QueryResponse waitForQueryResults( - RetrySettings retrySettings, BigQueryRetryConfig bigQueryRetryConfig, final QueryResultsOption... resultsOptions) + RetrySettings retrySettings, + BigQueryRetryConfig bigQueryRetryConfig, + final QueryResultsOption... resultsOptions) throws InterruptedException { if (getConfiguration().getType() != Type.QUERY) { throw new UnsupportedOperationException( diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index fc141828f..6e544189c 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -27,6 +27,7 @@ import com.google.api.services.bigquery.model.*; import com.google.api.services.bigquery.model.JobStatistics; import com.google.cloud.Policy; +import com.google.cloud.RetryOption; import com.google.cloud.ServiceOptions; import com.google.cloud.Tuple; import com.google.cloud.bigquery.BigQuery.JobOption; @@ -52,8 +53,6 @@ @RunWith(MockitoJUnitRunner.class) public class BigQueryImplTest { - // TODO(NOW) - private static final String PROJECT = "project"; private static final String LOCATION = "US"; private static final String OTHER_PROJECT = "otherProject"; @@ -1595,6 +1594,115 @@ public void testCreateJobFailureShouldRetry() { verify(bigqueryRpcMock, times(6)).create(jobCapture.capture(), eq(EMPTY_RPC_OPTIONS)); } + @Test + public void testCreateJobWithBigQueryRetryConfigFailureShouldRetry() { + // Validate create job with BigQueryRetryConfig that retries on rate limit error message. + JobOption bigQueryRetryConfigOption = JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder() + .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) + .retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG) + .retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX) + .build()); + + Map bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption); + when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) + .thenThrow( + new BigQueryException( + 400, RATE_LIMIT_ERROR_MSG)) // retrial on based on RATE_LIMIT_EXCEEDED_MSG + .thenThrow(new BigQueryException(200, RATE_LIMIT_ERROR_MSG)) + .thenReturn(newJobPb()); + + bigquery = options.getService(); + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + ((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption); + verify(bigqueryRpcMock, times(3)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); + } + + @Test + public void testCreateJobWithBigQueryRetryConfigFailureShouldNotRetry() { + // Validate create job with BigQueryRetryConfig that does not retry on rate limit error message. + JobOption bigQueryRetryConfigOption = JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder() + .build()); + + Map bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption); + when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) + .thenThrow( + new BigQueryException( + 400, RATE_LIMIT_ERROR_MSG)); + + bigquery = options.getService(); + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + try { + ((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption); + fail("JobException expected"); + } catch (BigQueryException e) { + assertNotNull(e.getMessage()); + } + // Verify that getQueryResults is attempted only once and not retried since the error message + // does not match. + verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); + } + + @Test + public void testCreateJobWithRetryOptionsFailureShouldRetry() { + // Validate create job with RetryOptions. + JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(4)); + Map bigQueryRpcOptions = optionMap(retryOptions); + when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenThrow(new BigQueryException(502, "Bad Gateway")) + .thenThrow(new BigQueryException(503, "Service Unavailable")) + .thenReturn(newJobPb()); + + bigquery = options.getService(); + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + ((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions); + verify(bigqueryRpcMock, times(4)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); + } + + @Test + public void testCreateJobWithRetryOptionsFailureShouldNotRetry() { + // Validate create job with RetryOptions that only attempts once (no retry). + JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(1)); + Map bigQueryRpcOptions = optionMap(retryOptions); + when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) + .thenThrow(new BigQueryException(500, "InternalError")) + .thenReturn(newJobPb()); + + bigquery = options.getService(); + bigquery = + options + .toBuilder() + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .build() + .getService(); + + try { + ((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions); + fail("JobException expected"); + } catch (BigQueryException e) { + assertNotNull(e.getMessage()); + } + verify(bigqueryRpcMock, times(1)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); + } + @Test public void testCreateJobWithSelectedFields() { when(bigqueryRpcMock.create( @@ -2299,8 +2407,6 @@ public void testGetQueryResults() { verify(bigqueryRpcMock).getQueryResults(PROJECT, JOB, null, EMPTY_RPC_OPTIONS); } - // TODO(NOW) - @Test public void testGetQueryResultsRetry() { JobId queryJob = JobId.of(JOB); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java index f537461e5..396bb754a 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/JobTest.java @@ -37,14 +37,10 @@ import com.google.api.gax.paging.Page; import com.google.api.gax.paging.Pages; import com.google.cloud.RetryOption; -import com.google.cloud.bigquery.BigQuery.QueryResultsOption; import com.google.cloud.bigquery.JobStatistics.CopyStatistics; import com.google.cloud.bigquery.JobStatistics.QueryStatistics; import com.google.cloud.bigquery.JobStatus.State; import com.google.common.collect.ImmutableList; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -69,6 +65,10 @@ public class JobTest { CopyStatistics.newBuilder().setCreationTimestamp(1L).setEndTime(3L).setStartTime(2L).build(); private static final CopyJobConfiguration COPY_CONFIGURATION = CopyJobConfiguration.of(TABLE_ID1, TABLE_ID2); + private static final QueryJobConfiguration DDL_QUERY_CONFIGURATION = + QueryJobConfiguration.newBuilder("CREATE VIEW").setDestinationTable(TABLE_ID1).build(); + private static final QueryJobConfiguration DRL_QUERY_CONFIGURATION = + QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build(); private static final JobInfo JOB_INFO = JobInfo.newBuilder(COPY_CONFIGURATION) .setJobId(JOB_ID) @@ -89,7 +89,10 @@ public class JobTest { RetryOption.retryDelayMultiplier(1.0) }; - private static final BigQueryRetryConfig TEST_BIGQUERY_RETRY_CONFIG = BigQueryRetryConfig.newBuilder().retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG).build(); + private static final BigQueryRetryConfig TEST_BIGQUERY_RETRY_CONFIG = + BigQueryRetryConfig.newBuilder() + .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) + .build(); @Rule public MockitoRule rule; @@ -197,8 +200,6 @@ public void testWaitFor() throws InterruptedException { @Test public void testWaitForAndGetQueryResultsEmpty() throws InterruptedException { - QueryJobConfiguration jobConfig = - QueryJobConfiguration.newBuilder("CREATE VIEW").setDestinationTable(TABLE_ID1).build(); QueryStatistics jobStatistics = QueryStatistics.newBuilder() .setCreationTimestamp(1L) @@ -206,7 +207,7 @@ public void testWaitForAndGetQueryResultsEmpty() throws InterruptedException { .setStartTime(2L) .build(); JobInfo jobInfo = - JobInfo.newBuilder(jobConfig) + JobInfo.newBuilder(DDL_QUERY_CONFIGURATION) .setJobId(JOB_ID) .setStatistics(jobStatistics) .setJobId(JOB_ID) @@ -234,7 +235,7 @@ public void testWaitForAndGetQueryResultsEmpty() throws InterruptedException { when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) .thenReturn(completedQuery); when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); - job = this.job.toBuilder().setConfiguration(jobConfig).build(); + job = this.job.toBuilder().setConfiguration(DDL_QUERY_CONFIGURATION).build(); assertThat(job.waitFor(TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); assertThat(job.getQueryResults().iterateAll()).isEmpty(); verify(bigquery, times(2)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); @@ -243,8 +244,6 @@ public void testWaitForAndGetQueryResultsEmpty() throws InterruptedException { @Test public void testWaitForAndGetQueryResultsEmptyWithSchema() throws InterruptedException { - QueryJobConfiguration jobConfig = - QueryJobConfiguration.newBuilder("CREATE VIEW").setDestinationTable(TABLE_ID1).build(); QueryStatistics jobStatistics = QueryStatistics.newBuilder() .setCreationTimestamp(1L) @@ -252,7 +251,7 @@ public void testWaitForAndGetQueryResultsEmptyWithSchema() throws InterruptedExc .setStartTime(2L) .build(); JobInfo jobInfo = - JobInfo.newBuilder(jobConfig) + JobInfo.newBuilder(DDL_QUERY_CONFIGURATION) .setJobId(JOB_ID) .setStatistics(jobStatistics) .setJobId(JOB_ID) @@ -280,7 +279,7 @@ public void testWaitForAndGetQueryResultsEmptyWithSchema() throws InterruptedExc when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) .thenReturn(completedQuery); - job = this.job.toBuilder().setConfiguration(jobConfig).build(); + job = this.job.toBuilder().setConfiguration(DDL_QUERY_CONFIGURATION).build(); assertThat(job.waitFor(TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); assertThat(job.getQueryResults().getSchema()) .isEqualTo(Schema.of(Field.of("field1", LegacySQLTypeName.BOOLEAN))); @@ -290,8 +289,6 @@ public void testWaitForAndGetQueryResultsEmptyWithSchema() throws InterruptedExc @Test public void testWaitForAndGetQueryResults() throws InterruptedException { - QueryJobConfiguration jobConfig = - QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build(); QueryStatistics jobStatistics = QueryStatistics.newBuilder() .setCreationTimestamp(1L) @@ -299,7 +296,7 @@ public void testWaitForAndGetQueryResults() throws InterruptedException { .setStartTime(2L) .build(); JobInfo jobInfo = - JobInfo.newBuilder(jobConfig) + JobInfo.newBuilder(DRL_QUERY_CONFIGURATION) .setJobId(JOB_ID) .setStatistics(jobStatistics) .setJobId(JOB_ID) @@ -335,7 +332,7 @@ public void testWaitForAndGetQueryResults() throws InterruptedException { when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) .thenReturn(completedQuery); when(bigquery.listTableData(eq(TABLE_ID1), any(Schema.class))).thenReturn(result); - job = this.job.toBuilder().setConfiguration(jobConfig).build(); + job = this.job.toBuilder().setConfiguration(DRL_QUERY_CONFIGURATION).build(); assertThat(job.waitFor(TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); assertThat(job.getQueryResults().iterateAll()).hasSize(0); verify(bigquery, times(2)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); @@ -413,9 +410,7 @@ public void testWaitForWithTimeout() throws InterruptedException { } @Test - public void testWaitForBigQueryRetryConfig() throws InterruptedException { - QueryJobConfiguration jobConfig = - QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build(); + public void testWaitForWithBigQueryRetryConfig() throws InterruptedException { QueryStatistics jobStatistics = QueryStatistics.newBuilder() .setCreationTimestamp(1L) @@ -423,7 +418,7 @@ public void testWaitForBigQueryRetryConfig() throws InterruptedException { .setStartTime(2L) .build(); JobInfo jobInfo = - JobInfo.newBuilder(jobConfig) + JobInfo.newBuilder(DRL_QUERY_CONFIGURATION) .setJobId(JOB_ID) .setStatistics(jobStatistics) .setJobId(JOB_ID) @@ -438,13 +433,6 @@ public void testWaitForBigQueryRetryConfig() throws InterruptedException { when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock()); Job completedJob = expectedJob.toBuilder().setStatus(new JobStatus(JobStatus.State.RUNNING)).build(); - Page singlePage = Pages.empty(); - TableResult result = - TableResult.newBuilder() - .setSchema(Schema.of()) - .setTotalRows(1L) - .setPageNoSchema(singlePage) - .build(); QueryResponse completedQuery = QueryResponse.newBuilder() .setCompleted(true) @@ -453,25 +441,18 @@ public void testWaitForBigQueryRetryConfig() throws InterruptedException { .setErrors(ImmutableList.of()) .build(); - // when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) - // .thenReturn(completedQuery); when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) .thenReturn(completedQuery); - // when(bigquery.listTableData(eq(TABLE_ID1), any(Schema.class))).thenReturn(result); - job = this.job.toBuilder().setConfiguration(jobConfig).build(); - assertThat(job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); - // assertThat(job.getQueryResults().iterateAll()).hasSize(0); + job = this.job.toBuilder().setConfiguration(DRL_QUERY_CONFIGURATION).build(); + assertThat(job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS)) + .isSameInstanceAs(completedJob); verify(bigquery, times(1)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); verify(bigquery).getJob(JOB_INFO.getJobId()); } @Test - public void testWaitForBigQueryRetryConfigRetryError() throws InterruptedException { - // TODO(NOW) - - QueryJobConfiguration jobConfig = - QueryJobConfiguration.newBuilder("SELECT 1").setDestinationTable(TABLE_ID1).build(); + public void testWaitForWithBigQueryRetryConfigShouldRetry() throws InterruptedException { QueryStatistics jobStatistics = QueryStatistics.newBuilder() .setCreationTimestamp(1L) @@ -479,7 +460,7 @@ public void testWaitForBigQueryRetryConfigRetryError() throws InterruptedExcepti .setStartTime(2L) .build(); JobInfo jobInfo = - JobInfo.newBuilder(jobConfig) + JobInfo.newBuilder(DRL_QUERY_CONFIGURATION) .setJobId(JOB_ID) .setStatistics(jobStatistics) .setJobId(JOB_ID) @@ -494,13 +475,6 @@ public void testWaitForBigQueryRetryConfigRetryError() throws InterruptedExcepti when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock()); Job completedJob = expectedJob.toBuilder().setStatus(new JobStatus(JobStatus.State.RUNNING)).build(); - Page singlePage = Pages.empty(); - TableResult result = - TableResult.newBuilder() - .setSchema(Schema.of()) - .setTotalRows(1L) - .setPageNoSchema(singlePage) - .build(); QueryResponse completedQuery = QueryResponse.newBuilder() .setCompleted(true) @@ -509,28 +483,73 @@ public void testWaitForBigQueryRetryConfigRetryError() throws InterruptedExcepti .setErrors(ImmutableList.of()) .build(); - - // when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) - // .thenReturn(completedQuery); - // Setup the error message. TODO when(bigquery.getJob(JOB_INFO.getJobId())).thenReturn(completedJob); - // BigQueryError bigQueryError = new BigQueryError("invalidQuery", "US", "invalidQuery"); - BigQueryError bigQueryError = new BigQueryError("invalidQuery", "US", "Exceeded rate limits:"); + BigQueryError bigQueryError = + new BigQueryError( + "testReasonRateLimitExceeded", "US", "testMessage: Exceeded rate limits:"); ImmutableList bigQueryErrorList = ImmutableList.of(bigQueryError); BigQueryException bigQueryException = new BigQueryException(bigQueryErrorList); when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) - // .thenReturn(completedQuery) - .thenThrow(bigQueryException); + .thenThrow(bigQueryException) + .thenReturn(completedQuery); + job = this.job.toBuilder().setConfiguration(DRL_QUERY_CONFIGURATION).build(); + assertThat(job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS)) + .isSameInstanceAs(completedJob); + // Verify that getQueryResults is attempted twice. First during bigQueryException with "Exceeded + // rate limits" error message and the second successful attempt. + verify(bigquery, times(2)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); + verify(bigquery).getJob(JOB_INFO.getJobId()); + } + + @Test + public void testWaitForWithBigQueryRetryConfigErrorShouldNotRetry() throws InterruptedException { + QueryStatistics jobStatistics = + QueryStatistics.newBuilder() + .setCreationTimestamp(1L) + .setEndTime(3L) + .setStartTime(2L) + .build(); + JobInfo jobInfo = + JobInfo.newBuilder(DRL_QUERY_CONFIGURATION) + .setJobId(JOB_ID) + .setStatistics(jobStatistics) + .setJobId(JOB_ID) + .setEtag(ETAG) + .setGeneratedId(GENERATED_ID) + .setSelfLink(SELF_LINK) + .setUserEmail(EMAIL) + .setStatus(JOB_STATUS) + .build(); + + when(bigquery.getOptions()).thenReturn(mockOptions); + when(mockOptions.getClock()).thenReturn(CurrentMillisClock.getDefaultClock()); + QueryResponse completedQuery = + QueryResponse.newBuilder() + .setCompleted(true) + .setTotalRows(1) // Lies to force call of listTableData(). + .setSchema(Schema.of(Field.of("_f0", LegacySQLTypeName.INTEGER))) + .setErrors(ImmutableList.of()) + .build(); + + BigQueryError bigQueryError = + new BigQueryError("testReasonRateLimitExceeded", "US", "testMessage: do not retry error"); + + ImmutableList bigQueryErrorList = ImmutableList.of(bigQueryError); + BigQueryException bigQueryException = new BigQueryException(bigQueryErrorList); when(bigquery.getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS)) + .thenThrow(bigQueryException) .thenReturn(completedQuery); - // .thenThrow(bigQueryException); - // when(bigquery.listTableData(eq(TABLE_ID1), any(Schema.class))).thenReturn(result); - job = this.job.toBuilder().setConfiguration(jobConfig).build(); - assertThat(job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS)).isSameInstanceAs(completedJob); - // assertThat(job.getQueryResults().iterateAll()).hasSize(0); + job = this.job.toBuilder().setConfiguration(DRL_QUERY_CONFIGURATION).build(); + try { + job.waitFor(TEST_BIGQUERY_RETRY_CONFIG, TEST_RETRY_OPTIONS); + fail("JobException expected"); + } catch (BigQueryException e) { + assertNotNull(e.getErrors()); + } + // Verify that getQueryResults is attempted only once and not retried since the error message + // does not match. verify(bigquery, times(1)).getQueryResults(jobInfo.getJobId(), Job.DEFAULT_QUERY_WAIT_OPTIONS); - verify(bigquery).getJob(JOB_INFO.getJobId()); } @Test diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 04da8a0e9..40d405f61 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -136,6 +136,7 @@ import com.google.cloud.bigquery.TimePartitioning.Type; import com.google.cloud.bigquery.ViewDefinition; import com.google.cloud.bigquery.WriteChannelConfiguration; +import com.google.cloud.bigquery.spi.v2.BigQueryRpc; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.cloud.datacatalog.v1.CreatePolicyTagRequest; import com.google.cloud.datacatalog.v1.CreateTaxonomyRequest; @@ -3227,7 +3228,6 @@ public void testQuery() throws InterruptedException { @Test public void testQueryStatistics() throws InterruptedException { // Use CURRENT_TIMESTAMP to avoid potential caching. - // TODO(NOW) String query = "SELECT CURRENT_TIMESTAMP() AS ts"; QueryJobConfiguration config = QueryJobConfiguration.newBuilder(query) @@ -5281,6 +5281,24 @@ public void testCreateAndGetJob() throws InterruptedException, TimeoutException assertTrue(bigquery.delete(destinationTable)); } + @Test + public void testCreateJobAndWaitForWithRetryOptions() throws InterruptedException, TimeoutException { + // Note: This only tests the non failure/retry case. For retry cases, see unit tests with mocked RPC calls. + QueryJobConfiguration config = + QueryJobConfiguration.newBuilder("SELECT CURRENT_TIMESTAMP() as ts") + .setDefaultDataset(DATASET) + .setUseLegacySql(false) + .build(); + + BigQueryRetryConfig bigQueryRetryConfig = BigQueryRetryConfig.newBuilder().build(); + JobOption bigQueryRetryConfigOption = JobOption.bigQueryRetryConfig(bigQueryRetryConfig); + JobOption retryOptions = JobOption.retryOptions(RetryOption.maxAttempts(1)); + + Job job = bigquery.create(JobInfo.of(config), bigQueryRetryConfigOption, retryOptions); + job = job.waitFor(bigQueryRetryConfig); + assertEquals(DONE, job.getStatus().getState()); + } + @Test public void testCreateAndGetJobWithSelectedFields() throws InterruptedException, TimeoutException { From a2f073b8f26001e7d305198411149fe153af37d7 Mon Sep 17 00:00:00 2001 From: PhongChuong Date: Tue, 23 Jul 2024 15:15:25 -0400 Subject: [PATCH 3/9] Fix lint issues --- .../cloud/bigquery/BigQueryImplTest.java | 33 +++++++++++-------- .../cloud/bigquery/it/ITBigQueryTest.java | 7 ++-- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java index 6e544189c..88b8f6dbf 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java @@ -53,6 +53,7 @@ @RunWith(MockitoJUnitRunner.class) public class BigQueryImplTest { + private static final String PROJECT = "project"; private static final String LOCATION = "US"; private static final String OTHER_PROJECT = "otherProject"; @@ -1597,11 +1598,13 @@ public void testCreateJobFailureShouldRetry() { @Test public void testCreateJobWithBigQueryRetryConfigFailureShouldRetry() { // Validate create job with BigQueryRetryConfig that retries on rate limit error message. - JobOption bigQueryRetryConfigOption = JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder() - .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) - .retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG) - .retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX) - .build()); + JobOption bigQueryRetryConfigOption = + JobOption.bigQueryRetryConfig( + BigQueryRetryConfig.newBuilder() + .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) + .retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG) + .retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX) + .build()); Map bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption); when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) @@ -1619,21 +1622,20 @@ public void testCreateJobWithBigQueryRetryConfigFailureShouldRetry() { .build() .getService(); - ((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption); + ((BigQueryImpl) bigquery) + .create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption); verify(bigqueryRpcMock, times(3)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); } @Test public void testCreateJobWithBigQueryRetryConfigFailureShouldNotRetry() { // Validate create job with BigQueryRetryConfig that does not retry on rate limit error message. - JobOption bigQueryRetryConfigOption = JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder() - .build()); + JobOption bigQueryRetryConfigOption = + JobOption.bigQueryRetryConfig(BigQueryRetryConfig.newBuilder().build()); Map bigQueryRpcOptions = optionMap(bigQueryRetryConfigOption); when(bigqueryRpcMock.create(jobCapture.capture(), eq(bigQueryRpcOptions))) - .thenThrow( - new BigQueryException( - 400, RATE_LIMIT_ERROR_MSG)); + .thenThrow(new BigQueryException(400, RATE_LIMIT_ERROR_MSG)); bigquery = options.getService(); bigquery = @@ -1644,7 +1646,8 @@ public void testCreateJobWithBigQueryRetryConfigFailureShouldNotRetry() { .getService(); try { - ((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption); + ((BigQueryImpl) bigquery) + .create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), bigQueryRetryConfigOption); fail("JobException expected"); } catch (BigQueryException e) { assertNotNull(e.getMessage()); @@ -1673,7 +1676,8 @@ public void testCreateJobWithRetryOptionsFailureShouldRetry() { .build() .getService(); - ((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions); + ((BigQueryImpl) bigquery) + .create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions); verify(bigqueryRpcMock, times(4)).create(jobCapture.capture(), eq(bigQueryRpcOptions)); } @@ -1695,7 +1699,8 @@ public void testCreateJobWithRetryOptionsFailureShouldNotRetry() { .getService(); try { - ((BigQueryImpl) bigquery).create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions); + ((BigQueryImpl) bigquery) + .create(JobInfo.of(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY), retryOptions); fail("JobException expected"); } catch (BigQueryException e) { assertNotNull(e.getMessage()); diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 40d405f61..9d1c23308 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -136,7 +136,6 @@ import com.google.cloud.bigquery.TimePartitioning.Type; import com.google.cloud.bigquery.ViewDefinition; import com.google.cloud.bigquery.WriteChannelConfiguration; -import com.google.cloud.bigquery.spi.v2.BigQueryRpc; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.cloud.datacatalog.v1.CreatePolicyTagRequest; import com.google.cloud.datacatalog.v1.CreateTaxonomyRequest; @@ -5282,8 +5281,10 @@ public void testCreateAndGetJob() throws InterruptedException, TimeoutException } @Test - public void testCreateJobAndWaitForWithRetryOptions() throws InterruptedException, TimeoutException { - // Note: This only tests the non failure/retry case. For retry cases, see unit tests with mocked RPC calls. + public void testCreateJobAndWaitForWithRetryOptions() + throws InterruptedException, TimeoutException { + // Note: This only tests the non failure/retry case. For retry cases, see unit tests with mocked + // RPC calls. QueryJobConfiguration config = QueryJobConfiguration.newBuilder("SELECT CURRENT_TIMESTAMP() as ts") .setDefaultDataset(DATASET) From b03f3e4519afa77a0b317f82182538380266d6a2 Mon Sep 17 00:00:00 2001 From: PhongChuong Date: Tue, 23 Jul 2024 15:52:21 -0400 Subject: [PATCH 4/9] Revert unintentional changes to testQueryStatistics in ITBigQueryTest --- .../java/com/google/cloud/bigquery/it/ITBigQueryTest.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 9d1c23308..524dbab5a 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -3234,13 +3234,7 @@ public void testQueryStatistics() throws InterruptedException { .setUseQueryCache(false) .build(); - BigQueryRetryConfig retryConfig = - BigQueryRetryConfig.newBuilder() - .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG) - .build(); // retry config with Error Message for RateLimitExceeded Error - Job job = - bigquery.create( - JobInfo.of(JobId.of(), config), BigQuery.JobOption.bigQueryRetryConfig(retryConfig)); + Job job = bigquery.create(JobInfo.of(JobId.of(), config)); job = job.waitFor(); JobStatistics.QueryStatistics statistics = job.getStatistics(); From 40476293ac684402c6ef4ab38e98141dd2b48b2c Mon Sep 17 00:00:00 2001 From: PhongChuong Date: Tue, 23 Jul 2024 15:53:16 -0400 Subject: [PATCH 5/9] Revert unintentional changes to testQueryStatistics in ITBigQueryTest --- .../test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 524dbab5a..6941c2566 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -3233,7 +3233,6 @@ public void testQueryStatistics() throws InterruptedException { .setDefaultDataset(DatasetId.of(DATASET)) .setUseQueryCache(false) .build(); - Job job = bigquery.create(JobInfo.of(JobId.of(), config)); job = job.waitFor(); From a122a02d02050888ba365558f18d9aba2686b98d Mon Sep 17 00:00:00 2001 From: PhongChuong Date: Tue, 23 Jul 2024 15:54:21 -0400 Subject: [PATCH 6/9] Revert unintentional changes to testQueryStatistics in ITBigQueryTest --- .../test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 6941c2566..31bd5919c 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -54,7 +54,6 @@ import com.google.cloud.bigquery.BigQuery.TableOption; import com.google.cloud.bigquery.BigQueryDryRunResult; import com.google.cloud.bigquery.BigQueryError; -import com.google.cloud.bigquery.BigQueryErrorMessages; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.BigQueryResult; From 9fc9b34daae36402368b19a523fa7b655e795610 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 23 Jul 2024 19:54:41 +0000 Subject: [PATCH 7/9] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .../test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 524dbab5a..8ae4e5f45 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -54,7 +54,6 @@ import com.google.cloud.bigquery.BigQuery.TableOption; import com.google.cloud.bigquery.BigQueryDryRunResult; import com.google.cloud.bigquery.BigQueryError; -import com.google.cloud.bigquery.BigQueryErrorMessages; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.BigQueryResult; From fa0141531ff4a0c28f47fba9182cc7a306455551 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 23 Jul 2024 19:55:39 +0000 Subject: [PATCH 8/9] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .../test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java index 6941c2566..31bd5919c 100644 --- a/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java +++ b/google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java @@ -54,7 +54,6 @@ import com.google.cloud.bigquery.BigQuery.TableOption; import com.google.cloud.bigquery.BigQueryDryRunResult; import com.google.cloud.bigquery.BigQueryError; -import com.google.cloud.bigquery.BigQueryErrorMessages; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; import com.google.cloud.bigquery.BigQueryResult; From ff4fc46c7580109cea6bce437df497ae35235da5 Mon Sep 17 00:00:00 2001 From: PhongChuong Date: Wed, 24 Jul 2024 11:58:14 -0400 Subject: [PATCH 9/9] Add additional comments to Job.waitFor() for BigQueryRetryConfig --- .../java/com/google/cloud/bigquery/Job.java | 35 ++++++++++++++++--- 1 file changed, 30 insertions(+), 5 deletions(-) diff --git a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java index 700c64604..793b25687 100644 --- a/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java +++ b/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/Job.java @@ -197,12 +197,20 @@ public boolean isDone() { return job == null || JobStatus.State.DONE.equals(job.getStatus().getState()); } + /** See {@link #waitFor(BigQueryRetryConfig, RetryOption...)} */ + public Job waitFor(RetryOption... waitOptions) throws InterruptedException { + return waitForInternal(DEFAULT_RETRY_CONFIG, waitOptions); + } + /** * Blocks until this job completes its execution, either failing or succeeding. This method * returns current job's latest information. If the job no longer exists, this method returns * {@code null}. By default, the job status is checked using jittered exponential backoff with 1 * second as an initial delay, 2.0 as a backoff factor, 1 minute as maximum delay between polls, - * 12 hours as a total timeout and unlimited number of attempts. + * 12 hours as a total timeout and unlimited number of attempts. For query jobs, the job status + * check can be configured to retry on specific BigQuery error messages using {@link + * BigQueryRetryConfig}. This {@link BigQueryRetryConfig} configuration is not available for + * non-query jobs. * *

Example usage of {@code waitFor()}. * @@ -233,15 +241,32 @@ public boolean isDone() { * } * } * + *

Example usage of {@code waitFor()} with BigQuery retry configuration to retry on rate limit + * exceeded error messages for query jobs. + * + *

{@code
+   * Job completedJob =
+   *     job.waitFor(
+   *             BigQueryRetryConfig.newBuilder()
+   *                 .retryOnMessage(BigQueryErrorMessages.RATE_LIMIT_EXCEEDED_MSG)
+   *                 .retryOnMessage(BigQueryErrorMessages.JOB_RATE_LIMIT_EXCEEDED_MSG)
+   *                 .retryOnRegEx(BigQueryErrorMessages.RetryRegExPatterns.RATE_LIMIT_EXCEEDED_REGEX)
+   *                 .build());
+   * if (completedJob == null) {
+   *   // job no longer exists
+   * } else if (completedJob.getStatus().getError() != null) {
+   *   // job failed, handle error
+   * } else {
+   *   // job completed successfully
+   * }
+   * }
+ * + * @param bigQueryRetryConfig configures retries for query jobs for BigQuery failures * @param waitOptions options to configure checking period and timeout * @throws BigQueryException upon failure, check {@link BigQueryException#getCause()} for details * @throws InterruptedException if the current thread gets interrupted while waiting for the job * to complete */ - public Job waitFor(RetryOption... waitOptions) throws InterruptedException { - return waitForInternal(DEFAULT_RETRY_CONFIG, waitOptions); - } - public Job waitFor(BigQueryRetryConfig bigQueryRetryConfig, RetryOption... waitOptions) throws InterruptedException { return waitForInternal(bigQueryRetryConfig, waitOptions);