diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java index cd5106275646..b1df15d8c171 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java @@ -73,6 +73,10 @@ public class BigQueryTableInserter { // The initial backoff after a failure inserting rows into BigQuery. private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L; + // Backoff time bounds for rate limit exceeded errors. + private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1); + private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2); + private final Bigquery client; private final TableReference defaultRef; private final long maxRowsPerBatch; @@ -215,7 +219,25 @@ public void insertAll(TableReference ref, List rowList, executor.submit(new Callable>() { @Override public List call() throws IOException { - return insert.execute().getInsertErrors(); + BackOff backoff = new IntervalBoundedExponentialBackOff( + MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS); + while (true) { + try { + return insert.execute().getInsertErrors(); + } catch (IOException e) { + if (new ApiErrorExtractor().rateLimited(e)) { + LOG.info("BigQuery insertAll exceeded rate limit, retrying"); + try { + Thread.sleep(backoff.nextBackOffMillis()); + } catch (InterruptedException interrupted) { + throw new IOException( + "Interrupted while waiting before retrying insertAll"); + } + } else { + throw e; + } + } + } } })); strideIndices.add(strideIndex); diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java index 4406ee5c52b4..3521584ae099 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/IntervalBoundedExponentialBackOff.java @@ -53,7 +53,7 @@ public class IntervalBoundedExponentialBackOff implements BackOff { private final long initialIntervalMillis; private int currentAttempt; - public IntervalBoundedExponentialBackOff(int maximumIntervalMillis, long initialIntervalMillis) { + public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long initialIntervalMillis) { Preconditions.checkArgument( maximumIntervalMillis > 0, "Maximum interval must be greater than zero."); Preconditions.checkArgument( diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java index d53315ba6688..86448f6ed449 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java @@ -40,7 +40,9 @@ import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableDataInsertAllResponse; import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; import com.google.cloud.hadoop.util.RetryBoundedBackOff; import com.google.common.collect.ImmutableList; @@ -58,6 +60,8 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; /** * Tests of {@link BigQueryTableInserter}. @@ -236,4 +240,64 @@ public void testCreateTableDoesNotRetry() throws IOException { throw e; } } + + /** + * Tests that {@link BigQueryTableInserter#insertAll} retries quota rate limited attempts. + */ + @Test + public void testInsertRetry() throws IOException { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + List rows = new ArrayList<>(); + rows.add(new TableRow()); + + // First response is 403 rate limited, second response has valid payload. + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) + .thenReturn(toStream(new TableDataInsertAllResponse())); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + + inserter.insertAll(ref, rows); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying"); + } + + /** + * Tests that {@link BigQueryTableInserter#insertAll} does not retry non-rate-limited attempts. + */ + @Test + public void testInsertDoesNotRetry() throws Throwable { + TableReference ref = + new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table"); + List rows = new ArrayList<>(); + rows.add(new TableRow()); + + // First response is 403 not-rate-limited, second response has valid payload but should not + // be invoked. + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403))) + .thenReturn(toStream(new TableDataInsertAllResponse())); + + thrown.expect(GoogleJsonResponseException.class); + thrown.expectMessage("actually forbidden"); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + + try { + inserter.insertAll(ref, rows); + fail(); + } catch (RuntimeException e) { + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + throw e.getCause(); + } + } }