From 26a4eb18e71757e228c684bfc087bff108d90b7c Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 14 Mar 2016 21:26:19 -0700 Subject: [PATCH] [BEAM-50] Implement BigQueryIO.Write as a custom sink. Create BigQueryServices interface and added BigQueryIO pipeline tests. Removed BigQueryIO.Write evaluator. --- .../cloud/dataflow/sdk/io/BigQueryIO.java | 378 +++++++++++++----- .../dataflow/sdk/util/BigQueryServices.java | 61 +++ .../sdk/util/BigQueryServicesImpl.java | 196 +++++++++ .../cloud/dataflow/sdk/io/BigQueryIOTest.java | 192 ++++++++- .../sdk/util/BigQueryServicesImplTest.java | 262 ++++++++++++ 5 files changed, 975 insertions(+), 114 deletions(-) create mode 100644 sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java create mode 100644 sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java create mode 100644 sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index ae0ce4d8b321..c37d6c796269 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -17,14 +17,19 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.QueryRequest; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.dataflow.sdk.coders.AtomicCoder; import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.Coder.Context; import com.google.cloud.dataflow.sdk.coders.CoderException; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.StandardCoder; @@ -36,6 +41,7 @@ import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.dataflow.sdk.options.GcpOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.transforms.Aggregator; @@ -45,22 +51,28 @@ import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.Sum; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.BigQueryServices; +import com.google.cloud.dataflow.sdk.util.BigQueryServices.LoadService; +import com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl; import com.google.cloud.dataflow.sdk.util.BigQueryTableInserter; import com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator; +import com.google.cloud.dataflow.sdk.util.MimeTypes; import com.google.cloud.dataflow.sdk.util.PropertyNames; import com.google.cloud.dataflow.sdk.util.Reshuffle; import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal; import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; +import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -71,6 +83,9 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -725,12 +740,12 @@ public static Bound withoutValidation() { * {@link PCollection} of {@link TableRow TableRows} to a BigQuery table. */ public static class Bound extends PTransform, PDone> { - final TableReference table; + @Nullable final String jsonTableRef; - final SerializableFunction tableRefFunction; + @Nullable final SerializableFunction tableRefFunction; // Table schema. The schema is required only if the table does not exist. - final TableSchema schema; + @Nullable final String jsonSchema; // Options for creating the table. Valid values are CREATE_IF_NEEDED and // CREATE_NEVER. @@ -743,6 +758,9 @@ public static class Bound extends PTransform, PDone> { // An option to indicate if table validation is desired. Default is true. final boolean validate; + // A fake or mock BigQueryServices for tests. + @Nullable private BigQueryServices testBigQueryServices; + private static class TranslateTableSpecFunction implements SerializableFunction { private SerializableFunction tableSpecFunction; @@ -765,20 +783,22 @@ public TableReference apply(BoundedWindow value) { @Deprecated public Bound() { this(null, null, null, null, CreateDisposition.CREATE_IF_NEEDED, - WriteDisposition.WRITE_EMPTY, true); + WriteDisposition.WRITE_EMPTY, true, null); } - private Bound(String name, TableReference ref, - SerializableFunction tableRefFunction, TableSchema schema, - CreateDisposition createDisposition, WriteDisposition writeDisposition, - boolean validate) { + private Bound(String name, @Nullable String jsonTableRef, + @Nullable SerializableFunction tableRefFunction, + @Nullable String jsonSchema, + CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate, + @Nullable BigQueryServices testBigQueryServices) { super(name); - this.table = ref; + this.jsonTableRef = jsonTableRef; this.tableRefFunction = tableRefFunction; - this.schema = schema; - this.createDisposition = createDisposition; - this.writeDisposition = writeDisposition; + this.jsonSchema = jsonSchema; + this.createDisposition = checkNotNull(createDisposition, "createDisposition"); + this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); this.validate = validate; + this.testBigQueryServices = testBigQueryServices; } /** @@ -787,8 +807,8 @@ private Bound(String name, TableReference ref, *

Does not modify this object. */ public Bound named(String name) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -807,8 +827,8 @@ public Bound to(String tableSpec) { *

Does not modify this object. */ public Bound to(TableReference table) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, toJsonString(table), tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -836,8 +856,8 @@ public Bound to( */ public Bound toTableReference( SerializableFunction tableRefFunction) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -847,8 +867,8 @@ public Bound toTableReference( *

Does not modify this object. */ public Bound withSchema(TableSchema schema) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, toJsonString(schema), + createDisposition, writeDisposition, validate, testBigQueryServices); } /** @@ -857,8 +877,8 @@ public Bound withSchema(TableSchema schema) { *

Does not modify this object. */ public Bound withCreateDisposition(CreateDisposition createDisposition) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -867,8 +887,8 @@ public Bound withCreateDisposition(CreateDisposition createDisposition) { *

Does not modify this object. */ public Bound withWriteDisposition(WriteDisposition writeDisposition) { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, validate); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testBigQueryServices); } /** @@ -877,8 +897,14 @@ public Bound withWriteDisposition(WriteDisposition writeDisposition) { *

Does not modify this object. */ public Bound withoutValidation() { - return new Bound(name, table, tableRefFunction, schema, createDisposition, - writeDisposition, false); + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, false, testBigQueryServices); + } + + @VisibleForTesting + Bound withTestServices(BigQueryServices testServices) { + return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + writeDisposition, validate, testServices); } private static void verifyTableEmpty( @@ -907,6 +933,7 @@ private static void verifyTableEmpty( public PDone apply(PCollection input) { BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); + TableReference table = getTable(); if (table == null && tableRefFunction == null) { throw new IllegalStateException( "must set the table reference of a BigQueryIO.Write transform"); @@ -917,7 +944,7 @@ public PDone apply(PCollection input) { + "transform"); } - if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && schema == null) { + if (createDisposition == CreateDisposition.CREATE_IF_NEEDED && jsonSchema == null) { throw new IllegalArgumentException("CreateDisposition is CREATE_IF_NEEDED, " + "however no schema was provided."); } @@ -959,10 +986,34 @@ public PDone apply(PCollection input) { + "supported for unbounded PCollections or when using tablespec functions."); } - return input.apply(new StreamWithDeDup(table, tableRefFunction, schema)); + return input.apply(new StreamWithDeDup(table, tableRefFunction, getSchema())); } - return PDone.in(input.getPipeline()); + String tempLocation = options.getTempLocation(); + checkArgument(!Strings.isNullOrEmpty(tempLocation), + "BigQueryIO.Write needs a GCS temp location to store temp files."); + if (testBigQueryServices == null) { + try { + GcsPath.fromUri(tempLocation); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException(String.format( + "BigQuery temp location expected a valid 'gs://' path, but was given '%s'", + tempLocation), e); + } + } + String jobIdToken = UUID.randomUUID().toString(); + String tempFilePrefix = tempLocation + "/BigQuerySinkTemp/" + jobIdToken; + BigQueryServices bqServices = getBigQueryServices(); + return input.apply("Write", com.google.cloud.dataflow.sdk.io.Write.to( + new BigQuerySink( + jobIdToken, + jsonTableRef, + jsonSchema, + getWriteDisposition(), + getCreateDisposition(), + tempFilePrefix, + input.getCoder(), + bqServices))); } @Override @@ -970,17 +1021,6 @@ protected Coder getDefaultOutputCoder() { return VoidCoder.of(); } - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bound.class, new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - Bound transform, DirectPipelineRunner.EvaluationContext context) { - evaluateWriteHelper(transform, context); - } - }); - } - /** Returns the create disposition. */ public CreateDisposition getCreateDisposition() { return createDisposition; @@ -993,24 +1033,191 @@ public WriteDisposition getWriteDisposition() { /** Returns the table schema. */ public TableSchema getSchema() { - return schema; + return fromJsonString(jsonSchema, TableSchema.class); } /** Returns the table reference, or {@code null} if a . */ public TableReference getTable() { - return table; + return fromJsonString(jsonTableRef, TableReference.class); } /** Returns {@code true} if table validation is enabled. */ public boolean getValidate() { return validate; } + + private BigQueryServices getBigQueryServices() { + if (testBigQueryServices != null) { + return testBigQueryServices; + } else { + return new BigQueryServicesImpl(); + } + } } /** Disallow construction of utility class. */ private Write() {} } + /** + * {@link BigQuerySink} is implemented as a {@link FileBasedSink}. + * + *

It uses BigQuery load job to import files into BigQuery. + */ + static class BigQuerySink extends FileBasedSink { + private final String jobIdToken; + @Nullable private final String jsonTable; + @Nullable private final String jsonSchema; + private final WriteDisposition writeDisposition; + private final CreateDisposition createDisposition; + private final Coder coder; + private final BigQueryServices bqServices; + + public BigQuerySink( + String jobIdToken, + @Nullable String jsonTable, + @Nullable String jsonSchema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition, + String tempFile, + Coder coder, + BigQueryServices bqServices) { + super(tempFile, ".json"); + this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); + this.jsonTable = jsonTable; + this.jsonSchema = jsonSchema; + this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); + this.createDisposition = checkNotNull(createDisposition, "createDisposition"); + this.coder = checkNotNull(coder, "coder"); + this.bqServices = checkNotNull(bqServices, "bqServices"); + } + + @Override + public FileBasedSink.FileBasedWriteOperation createWriteOperation( + PipelineOptions options) { + return new BigQueryWriteOperation(this); + } + + private static class BigQueryWriteOperation extends FileBasedWriteOperation { + // The maximum number of retry load jobs. + private static final int MAX_RETRY_LOAD_JOBS = 3; + + private final BigQuerySink bigQuerySink; + + private BigQueryWriteOperation(BigQuerySink sink) { + super(checkNotNull(sink, "sink")); + this.bigQuerySink = sink; + } + + @Override + public FileBasedWriter createWriter(PipelineOptions options) throws Exception { + return new TableRowWriter(this, bigQuerySink.coder); + } + + @Override + public void finalize(Iterable writerResults, PipelineOptions options) + throws IOException, InterruptedException { + try { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + List tempFiles = Lists.newArrayList(); + for (FileResult result : writerResults) { + tempFiles.add(result.getFilename()); + } + if (!tempFiles.isEmpty()) { + load( + bigQuerySink.bqServices.getLoadService(bqOptions), + bigQuerySink.jobIdToken, + fromJsonString(bigQuerySink.jsonTable, TableReference.class), + tempFiles, + fromJsonString(bigQuerySink.jsonSchema, TableSchema.class), + bigQuerySink.writeDisposition, + bigQuerySink.createDisposition); + } + } finally { + removeTemporaryFiles(options); + } + } + + /** + * Import files into BigQuery with load jobs. + * + *

Returns if files are successfully loaded into BigQuery. + * Throws a RuntimeException if: + * 1. The status of one load job is UNKNOWN. This is to avoid duplicating data. + * 2. It exceeds {@code MAX_RETRY_LOAD_JOBS}. + * + *

If a load job failed, it will try another load job with a different job id. + */ + private void load( + LoadService loadService, + String jobIdPrefix, + TableReference ref, + List gcsUris, + @Nullable TableSchema schema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) throws InterruptedException, IOException { + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setSourceUris(gcsUris); + loadConfig.setDestinationTable(ref); + loadConfig.setSchema(schema); + loadConfig.setWriteDisposition(writeDisposition.name()); + loadConfig.setCreateDisposition(createDisposition.name()); + loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON"); + + boolean retrying = false; + String projectId = ref.getProjectId(); + for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) { + String jobId = jobIdPrefix + "-" + i; + if (retrying) { + LOG.info("Previous load jobs failed, retrying."); + } + LOG.info("Starting BigQuery load job: {}", jobId); + loadService.startLoadJob(jobId, loadConfig); + BigQueryServices.Status jobStatus = loadService.pollJobStatus(projectId, jobId); + switch (jobStatus) { + case SUCCEEDED: + return; + case UNKNOWN: + throw new RuntimeException("Failed to poll the load job status."); + case FAILED: + LOG.info("BigQuery load job failed: {}", jobId); + retrying = true; + continue; + default: + throw new IllegalStateException("Unexpected job status: " + jobStatus); + } + } + throw new RuntimeException( + "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS); + } + } + + private static class TableRowWriter extends FileBasedWriter { + private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); + private final Coder coder; + private OutputStream out; + + public TableRowWriter( + FileBasedWriteOperation writeOperation, Coder coder) { + super(writeOperation); + this.mimeType = MimeTypes.TEXT; + this.coder = coder; + } + + @Override + protected void prepareWrite(WritableByteChannel channel) throws Exception { + out = Channels.newOutputStream(channel); + } + + @Override + public void write(TableRow value) throws Exception { + // Use Context.OUTER to encode and NEWLINE as the delimeter. + coder.encode(value, out, Context.OUTER); + out.write(NEWLINE); + } + } + } + private static void verifyDatasetPresence(BigQueryOptions options, TableReference table) { try { Bigquery client = Transport.newBigQueryClient(options).build(); @@ -1080,11 +1287,7 @@ private static class StreamingWriteFn /** Constructor. */ StreamingWriteFn(TableSchema schema) { - try { - jsonTableSchema = JSON_FACTORY.toString(schema); - } catch (IOException e) { - throw new RuntimeException("Cannot initialize BigQuery streaming writer.", e); - } + jsonTableSchema = toJsonString(schema); } /** Prepares a target BigQuery table. */ @@ -1190,8 +1393,7 @@ public static ShardedKeyCoder of(Coder keyCoder) { public static ShardedKeyCoder of( @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List> components) { - Preconditions.checkArgument(components.size() == 1, - "Expecting 1 component, got " + components.size()); + checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size()); return of(components.get(0)); } @@ -1296,7 +1498,7 @@ private static class TagWithUniqueIdsAndTable TagWithUniqueIdsAndTable(BigQueryOptions options, TableReference table, SerializableFunction tableRefFunction) { - Preconditions.checkArgument(table == null ^ tableRefFunction == null, + checkArgument(table == null ^ tableRefFunction == null, "Exactly one of table or tableRefFunction should be set"); if (table != null) { if (table.getProjectId() == null) { @@ -1401,6 +1603,32 @@ public PDone apply(PCollection input) { } } + private static String toJsonString(Object item) { + if (item == null) { + return null; + } + try { + return JSON_FACTORY.toString(item); + } catch (IOException e) { + throw new RuntimeException( + String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()), + e); + } + } + + private static T fromJsonString(String json, Class clazz) { + if (json == null) { + return null; + } + try { + return JSON_FACTORY.fromString(json, clazz); + } catch (IOException e) { + throw new RuntimeException( + String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json), + e); + } + } + ///////////////////////////////////////////////////////////////////////////// /** Disallow construction of utility class. */ @@ -1454,50 +1682,4 @@ private static List getOrCreateMapListValue(Map> map, K key } return value; } - - /** - * Direct mode write evaluator. - * - *

This writes the entire table in a single BigQuery request. - * The table will be created if necessary. - */ - private static void evaluateWriteHelper( - Write.Bound transform, DirectPipelineRunner.EvaluationContext context) { - BigQueryOptions options = context.getPipelineOptions(); - Bigquery client = Transport.newBigQueryClient(options).build(); - BigQueryTableInserter inserter = new BigQueryTableInserter(client); - - try { - Map> tableRows = new HashMap<>(); - for (WindowedValue windowedValue : context.getPCollectionWindowedValues( - context.getInput(transform))) { - for (BoundedWindow window : windowedValue.getWindows()) { - TableReference ref; - if (transform.tableRefFunction != null) { - ref = transform.tableRefFunction.apply(window); - } else { - ref = transform.table; - } - if (ref.getProjectId() == null) { - ref.setProjectId(options.getProject()); - } - - List rows = getOrCreateMapListValue(tableRows, ref); - rows.add(windowedValue.getValue()); - } - } - - for (TableReference ref : tableRows.keySet()) { - LOG.info("Writing to BigQuery table {}", toTableSpec(ref)); - // {@link BigQueryTableInserter#getOrCreateTable} validates {@link CreateDisposition} - // and {@link WriteDisposition}. - // For each {@link TableReference}, it can only be called before rows are written. - inserter.getOrCreateTable( - ref, transform.writeDisposition, transform.createDisposition, transform.schema); - inserter.insertAll(ref, tableRows.get(ref)); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java new file mode 100644 index 000000000000..b39b2e88caaa --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.cloud.dataflow.sdk.options.BigQueryOptions; + +import java.io.IOException; +import java.io.Serializable; + +/** + * An interface for real, mock, or fake implementations of Cloud BigQuery services. + */ +public interface BigQueryServices extends Serializable { + + /** + * Status of a BigQuery job or request. + */ + enum Status { + SUCCEEDED, + FAILED, + UNKNOWN, + } + + /** + * Returns a real, mock, or fake {@link LoadService}. + */ + public LoadService getLoadService(BigQueryOptions bqOptions); + + /** + * An interface for the Cloud BigQuery load service. + */ + public interface LoadService { + /** + * Start a BigQuery load job. + */ + public void startLoadJob(String jobId, JobConfigurationLoad loadConfig) + throws InterruptedException, IOException; + + /** + * Poll the status of a BigQuery load job. + */ + public Status pollJobStatus(String projectId, String jobId) + throws InterruptedException, IOException; + } +} diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java new file mode 100644 index 000000000000..f0c886488a10 --- /dev/null +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfiguration; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.TableReference; +import com.google.cloud.dataflow.sdk.options.BigQueryOptions; +import com.google.cloud.hadoop.util.ApiErrorExtractor; +import com.google.common.annotations.VisibleForTesting; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; + +/** + * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery + * service. + */ +public class BigQueryServicesImpl implements BigQueryServices { + + // The maximum number of attempts to execute a load job RPC. + private static final int MAX_LOAD_JOB_RPC_ATTEMPTS = 10; + + // The initial backoff for executing a load job RPC. + private static final long INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); + // The maximum number of retries to poll the status of a load job. + // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery load job finishes. + private static final int MAX_LOAD_JOB_POLL_RETRIES = Integer.MAX_VALUE; + + // The initial backoff for polling the status of a load job. + private static final long INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60); + + @Override + public LoadService getLoadService(BigQueryOptions options) { + return new LoadServiceImpl(options); + } + + @VisibleForTesting + static class LoadServiceImpl implements BigQueryServices.LoadService { + private static final Logger LOG = LoggerFactory.getLogger(LoadServiceImpl.class); + + private final ApiErrorExtractor errorExtractor; + private final Bigquery client; + + @VisibleForTesting + LoadServiceImpl(Bigquery client) { + this.errorExtractor = new ApiErrorExtractor(); + this.client = client; + } + + private LoadServiceImpl(BigQueryOptions options) { + this.errorExtractor = new ApiErrorExtractor(); + this.client = Transport.newBigQueryClient(options).build(); + } + + /** + * {@inheritDoc} + * + *

Retries the RPC for at most {@code MAX_LOAD_JOB_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC retries. + */ + @Override + public void startLoadJob( + String jobId, + JobConfigurationLoad loadConfig) throws InterruptedException, IOException { + BackOff backoff = new AttemptBoundedExponentialBackOff( + MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS); + startLoadJob(jobId, loadConfig, Sleeper.DEFAULT, backoff); + } + + /** + * {@inheritDoc} + * + *

Retries the poll request for at most {@code MAX_LOAD_JOB_POLL_RETRIES} times + * until the job is DONE. + */ + @Override + public Status pollJobStatus(String projectId, String jobId) throws InterruptedException { + BackOff backoff = new AttemptBoundedExponentialBackOff( + MAX_LOAD_JOB_POLL_RETRIES, INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS); + return pollJobStatus(projectId, jobId, Sleeper.DEFAULT, backoff); + } + + @VisibleForTesting + void startLoadJob( + String jobId, + JobConfigurationLoad loadConfig, + Sleeper sleeper, + BackOff backoff) + throws InterruptedException, IOException { + TableReference ref = loadConfig.getDestinationTable(); + String projectId = ref.getProjectId(); + + Job job = new Job(); + JobReference jobRef = new JobReference(); + jobRef.setProjectId(projectId); + jobRef.setJobId(jobId); + job.setJobReference(jobRef); + JobConfiguration config = new JobConfiguration(); + config.setLoad(loadConfig); + job.setConfiguration(config); + + Exception lastException = null; + do { + try { + client.jobs().insert(projectId, job).execute(); + return; // SUCCEEDED + } catch (GoogleJsonResponseException e) { + if (errorExtractor.itemAlreadyExists(e)) { + return; // SUCCEEDED + } + // ignore and retry + LOG.warn("Ignore the error and retry inserting the job.", e); + lastException = e; + } catch (IOException e) { + // ignore and retry + LOG.warn("Ignore the error and retry inserting the job.", e); + lastException = e; + } + } while (nextBackOff(sleeper, backoff)); + throw new IOException( + String.format( + "Unable to insert job: %s, aborting after %d retries.", + jobId, MAX_LOAD_JOB_RPC_ATTEMPTS), + lastException); + } + + @VisibleForTesting + Status pollJobStatus( + String projectId, + String jobId, + Sleeper sleeper, + BackOff backoff) throws InterruptedException { + do { + try { + JobStatus status = client.jobs().get(projectId, jobId).execute().getStatus(); + if (status != null && status.getState() != null && status.getState().equals("DONE")) { + if (status.getErrorResult() != null) { + return Status.FAILED; + } else if (status.getErrors() != null && !status.getErrors().isEmpty()) { + return Status.FAILED; + } else { + return Status.SUCCEEDED; + } + } + // The job is not DONE, wait longer and retry. + } catch (IOException e) { + // ignore and retry + LOG.warn("Ignore the error and retry polling job status.", e); + } + } while (nextBackOff(sleeper, backoff)); + LOG.warn("Unable to poll job status: {}, aborting after {} retries.", + jobId, MAX_LOAD_JOB_POLL_RETRIES); + return Status.UNKNOWN; + } + + /** + * Identical to {@link BackOffUtils#next} but without checked IOException. + * @throws InterruptedException + */ + private boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException { + try { + return BackOffUtils.next(sleeper, backoff); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index 51c65563790a..0d084af6cc43 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertNull; import com.google.api.client.util.Data; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; @@ -31,10 +33,14 @@ import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.util.BigQueryServices; +import com.google.cloud.dataflow.sdk.util.BigQueryServices.Status; import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.common.collect.ImmutableList; import org.hamcrest.Matchers; import org.junit.Assert; @@ -43,8 +49,15 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; /** * Tests for BigQueryIO. @@ -52,8 +65,96 @@ @RunWith(JUnit4.class) public class BigQueryIOTest { + private static class FakeBigQueryServices implements BigQueryServices { + + private Object[] startLoadJobReturns; + private Object[] pollJobStatusReturns; + + /** + * Sets the return values for the mock {@link LoadService#startLoadJob}. + * + *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. + */ + private FakeBigQueryServices startLoadJobReturns(Object... startLoadJobReturns) { + this.startLoadJobReturns = startLoadJobReturns; + return this; + } + + /** + * Sets the return values for the mock {@link LoadService#pollJobStatus}. + * + *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. + */ + private FakeBigQueryServices pollJobStatusReturns(Object... pollJobStatusReturns) { + this.pollJobStatusReturns = pollJobStatusReturns; + return this; + } + + @Override + public LoadService getLoadService(BigQueryOptions bqOptions) { + return new FakeLoadService(startLoadJobReturns, pollJobStatusReturns); + } + + private static class FakeLoadService implements BigQueryServices.LoadService { + + private Object[] startLoadJobReturns; + private Object[] pollJobStatusReturns; + private int startLoadJobCallsCount; + private int pollJobStatusCallsCount; + + public FakeLoadService(Object[] startLoadJobReturns, Object[] pollJobStatusReturns) { + this.startLoadJobReturns = startLoadJobReturns; + this.pollJobStatusReturns = pollJobStatusReturns; + this.startLoadJobCallsCount = 0; + this.pollJobStatusCallsCount = 0; + } + + @Override + public void startLoadJob(String jobId, JobConfigurationLoad loadConfig) + throws InterruptedException, IOException { + if (startLoadJobCallsCount < startLoadJobReturns.length) { + Object ret = startLoadJobReturns[startLoadJobCallsCount++]; + if (ret instanceof IOException) { + throw (IOException) ret; + } else if (ret instanceof InterruptedException) { + throw (InterruptedException) ret; + } else { + return; + } + } else { + throw new RuntimeException( + "Exceeded expected number of calls: " + startLoadJobReturns.length); + } + } + + @Override + public Status pollJobStatus(String projectId, String jobId) throws InterruptedException { + if (pollJobStatusCallsCount < pollJobStatusReturns.length) { + Object ret = pollJobStatusReturns[pollJobStatusCallsCount++]; + if (ret instanceof Status) { + return (Status) ret; + } else if (ret instanceof InterruptedException) { + throw (InterruptedException) ret; + } else { + throw new RuntimeException("Unexpected return type: " + ret.getClass()); + } + } else { + throw new RuntimeException( + "Exceeded expected number of calls: " + pollJobStatusReturns.length); + } + } + } + } + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); + @Rule + public TemporaryFolder testFolder = new TemporaryFolder(); + @Mock + public BigQueryServices.LoadService mockBqLoadService; + + private BigQueryOptions bqOptions; private void checkReadTableObject( BigQueryIO.Read.Bound bound, String project, String dataset, String table) { @@ -93,10 +194,10 @@ private void checkWriteObjectWithValidate( BigQueryIO.Write.Bound bound, String project, String dataset, String table, TableSchema schema, CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate) { - assertEquals(project, bound.table.getProjectId()); - assertEquals(dataset, bound.table.getDatasetId()); - assertEquals(table, bound.table.getTableId()); - assertEquals(schema, bound.schema); + assertEquals(project, bound.getTable().getProjectId()); + assertEquals(dataset, bound.getTable().getDatasetId()); + assertEquals(table, bound.getTable().getTableId()); + assertEquals(schema, bound.getSchema()); assertEquals(createDisposition, bound.createDisposition); assertEquals(writeDisposition, bound.writeDisposition); assertEquals(validate, bound.validate); @@ -104,8 +205,11 @@ private void checkWriteObjectWithValidate( @Before public void setUp() { - BigQueryOptions options = PipelineOptionsFactory.as(BigQueryOptions.class); - options.setProject("defaultProject"); + bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.getRoot().getAbsolutePath() + "/BigQueryIOTest/"); + + MockitoAnnotations.initMocks(this); } @Test @@ -222,6 +326,67 @@ public void testBuildSourceWithTableAndFlatten() { p.run(); } + @Test + public void testCustomSink() throws Exception { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .startLoadJobReturns("done", "done", "done") + .pollJobStatusReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED); + + Pipeline p = TestPipeline.create(bqOptions); + p.apply(Create.of( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3))) + .setCoder(TableRowJsonCoder.of()) + .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id") + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation()); + p.run(); + + logged.verifyInfo("Starting BigQuery load job"); + logged.verifyInfo("Previous load jobs failed, retrying."); + File tempDir = new File(bqOptions.getTempLocation()); + assertEquals(0, tempDir.listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + return pathname.isFile(); + }}).length); + } + + @Test + public void testCustomSinkUnknown() throws Exception { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .startLoadJobReturns("done", "done") + .pollJobStatusReturns(Status.FAILED, Status.UNKNOWN); + + Pipeline p = TestPipeline.create(bqOptions); + p.apply(Create.of( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3))) + .setCoder(TableRowJsonCoder.of()) + .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id") + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Failed to poll the load job status."); + p.run(); + + File tempDir = new File(bqOptions.getTempLocation()); + assertEquals(0, tempDir.listFiles(new FileFilter() { + @Override + public boolean accept(File pathname) { + return pathname.isFile(); + }}).length); + } + @Test public void testBuildSink() { BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") @@ -334,7 +499,6 @@ public void testBuildSinkWithWriteDispositionEmpty() { null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); } - private void testWriteValidatesDataset(boolean streaming) { BigQueryOptions options = PipelineOptionsFactory.as(BigQueryOptions.class); options.setProject("someproject"); @@ -351,15 +515,11 @@ private void testWriteValidatesDataset(boolean streaming) { thrown.expectMessage( Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); - try { - p.apply(Create.of().withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.named("WriteMyTable") - .to(tableRef) - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withSchema(new TableSchema())); - } finally { - Assert.assertEquals("someproject", tableRef.getProjectId()); - } + p.apply(Create.of().withCoder(TableRowJsonCoder.of())) + .apply(BigQueryIO.Write.named("WriteMyTable") + .to(tableRef) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema())); } @Test diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java new file mode 100644 index 000000000000..200e8a72908f --- /dev/null +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java @@ -0,0 +1,262 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.sdk.util; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.client.googleapis.json.GoogleJsonError; +import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; +import com.google.api.client.googleapis.json.GoogleJsonErrorContainer; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.GenericJson; +import com.google.api.client.json.Json; +import com.google.api.client.json.jackson2.JacksonFactory; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.Sleeper; +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.ErrorProto; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.TableReference; +import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; +import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper; +import com.google.common.collect.ImmutableList; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Tests for {@link BigQueryServicesImpl}. + */ +@RunWith(JUnit4.class) +public class BigQueryServicesImplTest { + @Rule public ExpectedException thrown = ExpectedException.none(); + @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(BigQueryServicesImpl.class); + @Mock private LowLevelHttpResponse response; + private Bigquery bigquery; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + + // A mock transport that lets us mock the API responses. + MockHttpTransport transport = + new MockHttpTransport.Builder() + .setLowLevelHttpRequest( + new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + return response; + } + }) + .build(); + + // A sample BigQuery API client that uses default JsonFactory and RetryHttpInitializer. + bigquery = + new Bigquery.Builder( + transport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()) + .build(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds. + */ + @Test + public void testStartLoadJobSucceeds() throws IOException, InterruptedException { + Job testJob = new Job(); + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + Sleeper sleeper = new FastNanoClockAndSleeper(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + 5 /* attempts */, 1000 /* initialIntervalMillis */); + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + loadService.startLoadJob("jobId", loadConfig, sleeper, backoff); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds + * with an already exist job. + */ + @Test + public void testStartLoadJobSucceedsAlreadyExists() throws IOException, InterruptedException { + when(response.getStatusCode()).thenReturn(409); // 409 means already exists + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + Sleeper sleeper = new FastNanoClockAndSleeper(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + 5 /* attempts */, 1000 /* initialIntervalMillis */); + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + loadService.startLoadJob("jobId", loadConfig, sleeper, backoff); + + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#startLoadJob} succeeds with a retry. + */ + @Test + public void testStartLoadJobRetry() throws IOException, InterruptedException { + Job testJob = new Job(); + + // First response is 403 rate limited, second response has valid payload. + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) + .thenReturn(toStream(testJob)); + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + Sleeper sleeper = new FastNanoClockAndSleeper(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + 5 /* attempts */, 1000 /* initialIntervalMillis */); + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + loadService.startLoadJob("jobId", loadConfig, sleeper, backoff); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} succeeds. + */ + @Test + public void testPollJobStatusSucceeds() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus().setState("DONE")); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + BigQueryServices.Status status = + loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + + assertEquals(BigQueryServices.Status.SUCCEEDED, status); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} fails. + */ + @Test + public void testPollJobStatusFailed() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus().setState("DONE").setErrorResult(new ErrorProto())); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + BigQueryServices.Status status = + loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + + assertEquals(BigQueryServices.Status.FAILED, status); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryServicesImpl.LoadServiceImpl#pollJobStatus} returns UNKNOWN. + */ + @Test + public void testPollJobStatusUnknown() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus()); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryServicesImpl.LoadServiceImpl loadService = + new BigQueryServicesImpl.LoadServiceImpl(bigquery); + BigQueryServices.Status status = + loadService.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF); + + assertEquals(BigQueryServices.Status.UNKNOWN, status); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** A helper to wrap a {@link GenericJson} object in a content stream. */ + private static InputStream toStream(GenericJson content) throws IOException { + return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content)); + } + + /** A helper that generates the error JSON payload that Google APIs produce. */ + private static GoogleJsonErrorContainer errorWithReasonAndStatus(String reason, int status) { + ErrorInfo info = new ErrorInfo(); + info.setReason(reason); + info.setDomain("global"); + // GoogleJsonError contains one or more ErrorInfo objects; our utiities read the first one. + GoogleJsonError error = new GoogleJsonError(); + error.setErrors(ImmutableList.of(info)); + error.setCode(status); + // The actual JSON response is an error container. + GoogleJsonErrorContainer container = new GoogleJsonErrorContainer(); + container.setError(error); + return container; + } +} +