Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ public class BigQueryTableInserter {
// The initial backoff after a failure inserting rows into BigQuery.
private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;

// Backoff time bounds for rate limit exceeded errors.
private static final long INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.SECONDS.toMillis(1);
private static final long MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS = TimeUnit.MINUTES.toMillis(2);

private final Bigquery client;
private final TableReference defaultRef;
private final long maxRowsPerBatch;
Expand Down Expand Up @@ -215,7 +219,25 @@ public void insertAll(TableReference ref, List<TableRow> rowList,
executor.submit(new Callable<List<TableDataInsertAllResponse.InsertErrors>>() {
@Override
public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
return insert.execute().getInsertErrors();
BackOff backoff = new IntervalBoundedExponentialBackOff(
MAX_RATE_LIMIT_EXCEEDED_BACKOFF_MS, INITIAL_RATE_LIMIT_EXCEEDED_BACKOFF_MS);
while (true) {
try {
return insert.execute().getInsertErrors();
} catch (IOException e) {
if (new ApiErrorExtractor().rateLimited(e)) {
LOG.info("BigQuery insertAll exceeded rate limit, retrying");
try {
Thread.sleep(backoff.nextBackOffMillis());
} catch (InterruptedException interrupted) {
throw new IOException(
"Interrupted while waiting before retrying insertAll");
}
} else {
throw e;
}
}
}
}
}));
strideIndices.add(strideIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class IntervalBoundedExponentialBackOff implements BackOff {
private final long initialIntervalMillis;
private int currentAttempt;

public IntervalBoundedExponentialBackOff(int maximumIntervalMillis, long initialIntervalMillis) {
public IntervalBoundedExponentialBackOff(long maximumIntervalMillis, long initialIntervalMillis) {
Preconditions.checkArgument(
maximumIntervalMillis > 0, "Maximum interval must be greater than zero.");
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
import com.google.cloud.hadoop.util.RetryBoundedBackOff;
import com.google.common.collect.ImmutableList;
Expand All @@ -58,6 +60,8 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;

/**
* Tests of {@link BigQueryTableInserter}.
Expand Down Expand Up @@ -236,4 +240,64 @@ public void testCreateTableDoesNotRetry() throws IOException {
throw e;
}
}

/**
* Tests that {@link BigQueryTableInserter#insertAll} retries quota rate limited attempts.
*/
@Test
public void testInsertRetry() throws IOException {
TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<TableRow> rows = new ArrayList<>();
rows.add(new TableRow());

// First response is 403 rate limited, second response has valid payload.
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(403).thenReturn(200);
when(response.getContent())
.thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
.thenReturn(toStream(new TableDataInsertAllResponse()));

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);

inserter.insertAll(ref, rows);
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
expectedLogs.verifyInfo("BigQuery insertAll exceeded rate limit, retrying");
}

/**
* Tests that {@link BigQueryTableInserter#insertAll} does not retry non-rate-limited attempts.
*/
@Test
public void testInsertDoesNotRetry() throws Throwable {
TableReference ref =
new TableReference().setProjectId("project").setDatasetId("dataset").setTableId("table");
List<TableRow> rows = new ArrayList<>();
rows.add(new TableRow());

// First response is 403 not-rate-limited, second response has valid payload but should not
// be invoked.
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(403).thenReturn(200);
when(response.getContent())
.thenReturn(toStream(errorWithReasonAndStatus("actually forbidden", 403)))
.thenReturn(toStream(new TableDataInsertAllResponse()));

thrown.expect(GoogleJsonResponseException.class);
thrown.expectMessage("actually forbidden");

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);

try {
inserter.insertAll(ref, rows);
fail();
} catch (RuntimeException e) {
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
throw e.getCause();
}
}
}