From f11f306dc00fe211693f809354f13439aca70e5b Mon Sep 17 00:00:00 2001 From: Pei He Date: Fri, 13 May 2016 13:10:35 -0700 Subject: [PATCH] Backprot PR from Beam: Implement BigQueryIO.Read as Source. --- .../cloud/dataflow/sdk/io/BigQueryIO.java | 879 +++++++++++++++--- .../dataflow/sdk/util/BigQueryServices.java | 109 ++- .../sdk/util/BigQueryServicesImpl.java | 347 +++++-- .../cloud/dataflow/sdk/io/BigQueryIOTest.java | 548 +++++++++-- .../sdk/util/BigQueryServicesImplTest.java | 40 +- 5 files changed, 1656 insertions(+), 267 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index 6e13ad64c4..413254a40e 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -21,9 +21,16 @@ import static com.google.common.base.Preconditions.checkState; import com.google.api.client.json.JsonFactory; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.QueryRequest; import com.google.api.services.bigquery.model.TableReference; @@ -44,21 +51,26 @@ import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; -import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.transforms.Aggregator; +import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.View; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; +import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; +import com.google.cloud.dataflow.sdk.util.AvroUtils; import com.google.cloud.dataflow.sdk.util.BigQueryServices; +import com.google.cloud.dataflow.sdk.util.BigQueryServices.DatasetService; import com.google.cloud.dataflow.sdk.util.BigQueryServices.JobService; import com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl; import com.google.cloud.dataflow.sdk.util.BigQueryTableInserter; import com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator; +import com.google.cloud.dataflow.sdk.util.GcsUtil.GcsUtilFactory; import com.google.cloud.dataflow.sdk.util.IOChannelFactory; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.MimeTypes; @@ -66,40 +78,53 @@ import com.google.cloud.dataflow.sdk.util.Reshuffle; import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal; import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.cloud.dataflow.sdk.util.WindowingStrategy; import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; +import com.google.cloud.dataflow.sdk.values.PCollectionTuple; +import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.cloud.dataflow.sdk.values.PDone; import com.google.cloud.dataflow.sdk.values.PInput; +import com.google.cloud.dataflow.sdk.values.TupleTag; +import com.google.cloud.dataflow.sdk.values.TupleTagList; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.avro.generic.GenericRecord; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; import java.io.OutputStream; +import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -360,27 +385,39 @@ public static Bound withoutValidation() { * {@link PCollection} of {@link TableRow TableRows}. */ public static class Bound extends PTransform> { - TableReference table; - final String query; + @Nullable final String jsonTableRef; + @Nullable final String query; final boolean validate; - @Nullable - Boolean flattenResults; + @Nullable final Boolean flattenResults; + @Nullable final BigQueryServices testBigQueryServices; private static final String QUERY_VALIDATION_FAILURE_ERROR = "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" + " pipeline, This validation can be disabled using #withoutValidation."; + // The maximum number of retries to poll a BigQuery job in the cleanup phase. + // We expect the jobs have already DONE, and don't need a high max retires. + private static final int CLEANUP_JOB_POLL_MAX_RETRIES = 10; + private Bound() { - this(null, null, null, true, null); + this( + null /* name */, + null /* query */, + null /* jsonTableRef */, + true /* validate */, + null /* flattenResults */, + null /* testBigQueryServices */); } - private Bound(String name, String query, TableReference reference, boolean validate, - Boolean flattenResults) { + private Bound( + String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate, + @Nullable Boolean flattenResults, @Nullable BigQueryServices testBigQueryServices) { super(name); - this.table = reference; + this.jsonTableRef = jsonTableRef; this.query = query; this.validate = validate; this.flattenResults = flattenResults; + this.testBigQueryServices = testBigQueryServices; } /** @@ -389,7 +426,7 @@ private Bound(String name, String query, TableReference reference, boolean valid *

Does not modify this object. */ public Bound named(String name) { - return new Bound(name, query, table, validate, flattenResults); + return new Bound(name, query, jsonTableRef, validate, flattenResults, testBigQueryServices); } /** @@ -408,7 +445,8 @@ public Bound from(String tableSpec) { *

Does not modify this object. */ public Bound from(TableReference table) { - return new Bound(name, query, table, validate, flattenResults); + return new Bound( + name, query, toJsonString(table), validate, flattenResults, testBigQueryServices); } /** @@ -422,15 +460,15 @@ public Bound from(TableReference table) { * {@link BigQueryIO.Read.Bound#withoutResultFlattening}. */ public Bound fromQuery(String query) { - return new Bound(name, query, table, validate, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE)); + return new Bound(name, query, jsonTableRef, validate, + MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), testBigQueryServices); } /** * Disable table validation. */ public Bound withoutValidation() { - return new Bound(name, query, table, false, flattenResults); + return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices); } /** @@ -441,35 +479,34 @@ public Bound withoutValidation() { * from a table will cause an error during validation. */ public Bound withoutResultFlattening() { - return new Bound(name, query, table, validate, false); + return new Bound(name, query, jsonTableRef, validate, false, testBigQueryServices); + } + + @VisibleForTesting + Bound withTestServices(BigQueryServices testServices) { + return new Bound(name, query, jsonTableRef, validate, flattenResults, testServices); } @Override public void validate(PInput input) { - if (table == null && query == null) { - throw new IllegalStateException( - "Invalid BigQuery read operation, either table reference or query has to be set"); - } else if (table != null && query != null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a" - + " query and a table, only one of these should be provided"); - } else if (table != null && flattenResults != null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" - + " table with a result flattening preference, which is not configurable"); - } else if (query != null && flattenResults == null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" - + " query without a result flattening preference"); - } - - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - if (table != null && table.getProjectId() == null) { - // If user does not specify a project we assume the table to be located in the project - // that owns the Dataflow job. - LOG.warn(String.format(SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(), - table.getTableId(), bqOptions.getProject())); - table.setProjectId(bqOptions.getProject()); - } - if (validate) { + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + + TableReference table = getTableWithDefaultProject(bqOptions); + if (table == null && query == null) { + throw new IllegalStateException( + "Invalid BigQuery read operation, either table reference or query has to be set"); + } else if (table != null && query != null) { + throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a" + + " query and a table, only one of these should be provided"); + } else if (table != null && flattenResults != null) { + throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" + + " table with a result flattening preference, which is not configurable"); + } else if (query != null && flattenResults == null) { + throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" + + " query without a result flattening preference"); + } + // Check for source table/query presence for early failure notification. // Note that a presence check can fail if the table or dataset are created by earlier // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these @@ -502,14 +539,83 @@ private static void dryRunQuery(BigQueryOptions options, String query) { @Override public PCollection apply(PInput input) { - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - IsBounded.BOUNDED) - // Force the output's Coder to be what the read is using, and - // unchangeable later, to ensure that we read the input in the - // format specified by the Read transform. - .setCoder(TableRowJsonCoder.of()); + String uuid = randomUUIDString(); + final String jobIdToken = "beam_job_" + uuid; + + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + + BoundedSource source; + final BigQueryServices bqServices = getBigQueryServices(); + + final String extractDestinationDir; + String tempLocation = bqOptions.getTempLocation(); + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + extractDestinationDir = factory.resolve(tempLocation, uuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve extract destination directory in %s", tempLocation)); + } + + if (!Strings.isNullOrEmpty(query)) { + String projectId = bqOptions.getProject(); + String queryTempDatasetId = "temp_dataset_" + uuid; + String queryTempTableId = "temp_table_" + uuid; + + TableReference queryTempTableRef = new TableReference() + .setProjectId(projectId) + .setDatasetId(queryTempDatasetId) + .setTableId(queryTempTableId); + + String jsonQueryTempTable; + try { + jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef); + } catch (IOException e) { + throw new RuntimeException("Cannot initialize table to JSON strings.", e); + } + source = BigQueryQuerySource.create( + jobIdToken, query, jsonQueryTempTable, flattenResults, + extractDestinationDir, bqServices); + } else { + String jsonTable; + try { + jsonTable = JSON_FACTORY.toString(getTableWithDefaultProject(bqOptions)); + } catch (IOException e) { + throw new RuntimeException("Cannot initialize table to JSON strings.", e); + } + source = BigQueryTableSource.create( + jobIdToken, jsonTable, extractDestinationDir, bqServices); + } + PassThroughThenCleanup.CleanupOperation cleanupOperation = + new PassThroughThenCleanup.CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + + JobReference jobRef = new JobReference() + .setProjectId(bqOptions.getProject()) + .setJobId(getExtractJobId(jobIdToken)); + Job extractJob = bqServices.getJobService(bqOptions).pollJob( + jobRef, CLEANUP_JOB_POLL_MAX_RETRIES); + + Collection extractFiles = null; + if (extractJob != null) { + extractFiles = getExtractFilePaths(extractDestinationDir, extractJob); + } else { + IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); + Collection dirMatch = factory.match(extractDestinationDir); + if (!dirMatch.isEmpty()) { + extractFiles = factory.match(factory.resolve(extractDestinationDir, "*")); + } + } + if (extractFiles != null && !extractFiles.isEmpty()) { + new GcsUtilFactory().create(options).remove(extractFiles); + } + }}; + return input.getPipeline() + .apply(com.google.cloud.dataflow.sdk.io.Read.from(source)) + .setCoder(getDefaultOutputCoder()) + .apply(new PassThroughThenCleanup(cleanupOperation)); } @Override @@ -520,6 +626,7 @@ protected Coder getDefaultOutputCoder() { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + TableReference table = getTable(); if (table != null) { builder.add("table", toTableSpec(table)); @@ -531,22 +638,26 @@ public void populateDisplayData(DisplayData.Builder builder) { .addIfNotDefault("validation", validate, true); } - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bound.class, new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - Bound transform, DirectPipelineRunner.EvaluationContext context) { - evaluateReadHelper(transform, context); - } - }); + /** + * Returns the table to write, or {@code null} if reading from a query instead. + * + *

If the table's project is not specified, use the default one. + */ + @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) { + TableReference table = getTable(); + if (table != null && table.getProjectId() == null) { + // If user does not specify a project we assume the table to be located in + // the default project. + table.setProjectId(bqOptions.getProject()); + } + return table; } /** * Returns the table to read, or {@code null} if reading from a query instead. */ public TableReference getTable() { - return table; + return fromJsonString(jsonTableRef, TableReference.class); } /** @@ -569,12 +680,567 @@ public boolean getValidate() { public Boolean getFlattenResults() { return flattenResults; } + + private BigQueryServices getBigQueryServices() { + if (testBigQueryServices != null) { + return testBigQueryServices; + } else { + return new BigQueryServicesImpl(); + } + } } /** Disallow construction of utility class. */ private Read() {} } + /** + * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection} + * has been processed. + */ + @VisibleForTesting + static class PassThroughThenCleanup extends PTransform, PCollection> { + + private CleanupOperation cleanupOperation; + + PassThroughThenCleanup(CleanupOperation cleanupOperation) { + this.cleanupOperation = cleanupOperation; + } + + @Override + public PCollection apply(PCollection input) { + TupleTag mainOutput = new TupleTag<>(); + TupleTag cleanupSignal = new TupleTag<>(); + PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn()) + .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal))); + + PCollectionView cleanupSignalView = outputs.get(cleanupSignal) + .setCoder(VoidCoder.of()) + .apply(View.asSingleton().withDefaultValue(null)); + + input.getPipeline() + .apply("Create(CleanupOperation)", Create.of(cleanupOperation)) + .apply("Cleanup", ParDo.of( + new DoFn() { + @Override + public void processElement(ProcessContext c) + throws Exception { + c.element().cleanup(c.getPipelineOptions()); + } + }).withSideInputs(cleanupSignalView)); + + return outputs.get(mainOutput); + } + + private static class IdentityFn extends DoFn { + @Override + public void processElement(ProcessContext c) { + c.output(c.element()); + } + } + + abstract static class CleanupOperation implements Serializable { + abstract void cleanup(PipelineOptions options) throws Exception; + } + } + + /** + * A {@link BigQuerySourceBase} for reading BigQuery tables. + */ + @VisibleForTesting + static class BigQueryTableSource extends BigQuerySourceBase { + + static BigQueryTableSource create( + String jobIdToken, + String jsonTable, + String extractDestinationDir, + BigQueryServices bqServices) { + return new BigQueryTableSource(jobIdToken, jsonTable, extractDestinationDir, bqServices); + } + + private final String jsonTable; + private final AtomicReference tableSizeBytes; + + private BigQueryTableSource( + String jobIdToken, + String jsonTable, + String extractDestinationDir, + BigQueryServices bqServices) { + super(jobIdToken, extractDestinationDir, bqServices); + this.jsonTable = checkNotNull(jsonTable, "jsonTable"); + this.tableSizeBytes = new AtomicReference<>(); + } + + @Override + protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException { + return JSON_FACTORY.fromString(jsonTable, TableReference.class); + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableRef = JSON_FACTORY.fromString(jsonTable, TableReference.class); + return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef)); + } + + @Override + public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + if (tableSizeBytes.get() == null) { + TableReference table = JSON_FACTORY.fromString(jsonTable, TableReference.class); + + Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class)) + .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId()) + .getNumBytes(); + tableSizeBytes.compareAndSet(null, numBytes); + } + return tableSizeBytes.get(); + } + + @Override + protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { + // Do nothing. + } + } + + /** + * A {@link BigQuerySourceBase} for querying BigQuery tables. + */ + @VisibleForTesting + static class BigQueryQuerySource extends BigQuerySourceBase { + + static BigQueryQuerySource create( + String jobIdToken, + String query, + String jsonQueryTempTable, + Boolean flattenResults, + String extractDestinationDir, + BigQueryServices bqServices) { + return new BigQueryQuerySource( + jobIdToken, query, jsonQueryTempTable, flattenResults, extractDestinationDir, bqServices); + } + + private final String query; + private final String jsonQueryTempTable; + private final Boolean flattenResults; + private transient AtomicReference dryRunJobStats; + + private BigQueryQuerySource( + String jobIdToken, + String query, + String jsonQueryTempTable, + Boolean flattenResults, + String extractDestinationDir, + BigQueryServices bqServices) { + super(jobIdToken, extractDestinationDir, bqServices); + this.query = checkNotNull(query, "query"); + this.jsonQueryTempTable = checkNotNull(jsonQueryTempTable, "jsonQueryTempTable"); + this.flattenResults = checkNotNull(flattenResults, "flattenResults"); + this.dryRunJobStats = new AtomicReference<>(); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed(); + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + return new BigQueryReader(this, bqServices.getReaderFromQuery( + bqOptions, query, bqOptions.getProject(), flattenResults)); + } + + @Override + protected TableReference getTableToExtract(BigQueryOptions bqOptions) + throws IOException, InterruptedException { + // 1. Find the location of the query. + TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions) + .getQuery() + .getReferencedTables() + .get(0); + DatasetService tableService = bqServices.getDatasetService(bqOptions); + String location = tableService.getTable( + dryRunTempTable.getProjectId(), + dryRunTempTable.getDatasetId(), + dryRunTempTable.getTableId()).getLocation(); + + // 2. Create the temporary dataset in the query location. + TableReference tableToExtract = + JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class); + tableService.createDataset( + tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, ""); + + // 3. Execute the query. + String queryJobId = jobIdToken + "-query"; + executeQuery( + queryJobId, query, tableToExtract, flattenResults, bqServices.getJobService(bqOptions)); + return tableToExtract; + } + + @Override + protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { + TableReference tableToRemove = + JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class); + + DatasetService tableService = bqServices.getDatasetService(bqOptions); + tableService.deleteTable( + tableToRemove.getProjectId(), + tableToRemove.getDatasetId(), + tableToRemove.getTableId()); + tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); + } + + private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) + throws InterruptedException, IOException { + if (dryRunJobStats.get() == null) { + String projectId = bqOptions.getProject(); + JobStatistics jobStats = + bqServices.getJobService(bqOptions).dryRunQuery(projectId, query); + dryRunJobStats.compareAndSet(null, jobStats); + } + return dryRunJobStats.get(); + } + + private static void executeQuery( + String jobId, + String query, + TableReference destinationTable, + boolean flattenResults, + JobService jobService) throws IOException, InterruptedException { + JobReference jobRef = new JobReference() + .setProjectId(destinationTable.getProjectId()) + .setJobId(jobId); + JobConfigurationQuery queryConfig = new JobConfigurationQuery(); + queryConfig + .setQuery(query) + .setAllowLargeResults(true) + .setCreateDisposition("CREATE_IF_NEEDED") + .setDestinationTable(destinationTable) + .setFlattenResults(flattenResults) + .setPriority("BATCH") + .setWriteDisposition("WRITE_EMPTY"); + jobService.startQueryJob(jobRef, queryConfig); + Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); + if (parseStatus(job) != Status.SUCCEEDED) { + throw new IOException("Query job failed: " + jobId); + } + return; + } + + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + in.defaultReadObject(); + dryRunJobStats = new AtomicReference<>(); + } + } + + /** + * An abstract {@link BoundedSource} to read a table from BigQuery. + * + *

This source uses a BigQuery export job to take a snapshot of the table on GCS, and then + * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource}, + * and {@link BigQueryQuerySource}, depending on the configuration of the read. + * Specifically, + *

+ * ... + */ + private abstract static class BigQuerySourceBase extends BoundedSource { + // The maximum number of attempts to verify temp files. + private static final int MAX_FILES_VERIFY_ATTEMPTS = 10; + + // The maximum number of retries to poll a BigQuery job. + protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + + // The initial backoff for verifying temp files. + private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); + + protected final String jobIdToken; + protected final String extractDestinationDir; + protected final BigQueryServices bqServices; + + private BigQuerySourceBase( + String jobIdToken, + String extractDestinationDir, + BigQueryServices bqServices) { + this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); + this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir"); + this.bqServices = checkNotNull(bqServices, "bqServices"); + } + + @Override + public List> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableToExtract = getTableToExtract(bqOptions); + JobService jobService = bqServices.getJobService(bqOptions); + String extractJobId = getExtractJobId(jobIdToken); + List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); + + TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable( + tableToExtract.getProjectId(), + tableToExtract.getDatasetId(), + tableToExtract.getTableId()).getSchema(); + + cleanupTempResource(bqOptions); + return createSources(tempFiles, tableSchema); + } + + protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception; + + protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception; + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return false; + } + + @Override + public void validate() { + // Do nothing, validation is done in BigQuery.Read. + } + + @Override + public Coder getDefaultOutputCoder() { + return TableRowJsonCoder.of(); + } + + private List executeExtract( + String jobId, TableReference table, JobService jobService) + throws InterruptedException, IOException { + JobReference jobRef = new JobReference() + .setProjectId(table.getProjectId()) + .setJobId(jobId); + + String destinationUri = getExtractDestinationUri(extractDestinationDir); + JobConfigurationExtract extract = new JobConfigurationExtract() + .setSourceTable(table) + .setDestinationFormat("AVRO") + .setDestinationUris(ImmutableList.of(destinationUri)); + + LOG.info("Starting BigQuery extract job: {}", jobId); + jobService.startExtractJob(jobRef, extract); + Job extractJob = + jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); + if (parseStatus(extractJob) != Status.SUCCEEDED) { + throw new IOException(String.format( + "Extract job %s failed, status: %s", + extractJob.getJobReference().getJobId(), extractJob.getStatus())); + } + + List tempFiles = getExtractFilePaths(extractDestinationDir, extractJob); + return ImmutableList.copyOf(tempFiles); + } + + private List> createSources( + List files, TableSchema tableSchema) throws IOException, InterruptedException { + final String jsonSchema = JSON_FACTORY.toString(tableSchema); + + SerializableFunction function = + new SerializableFunction() { + @Override + public TableRow apply(GenericRecord input) { + try { + return AvroUtils.convertGenericRecordToTableRow( + input, JSON_FACTORY.fromString(jsonSchema, TableSchema.class)); + } catch (IOException e) { + throw new RuntimeException("Failed to convert GenericRecord to TableRow", e); + } + }}; + + List> avroSources = Lists.newArrayList(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + MAX_FILES_VERIFY_ATTEMPTS, INITIAL_FILES_VERIFY_BACKOFF_MILLIS); + for (String fileName : files) { + while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) { + if (IOChannelUtils.getFactory(fileName).getSizeBytes(fileName) != -1) { + break; + } + } + avroSources.add(new TransformingSource<>( + AvroSource.from(fileName), function, getDefaultOutputCoder())); + } + return ImmutableList.copyOf(avroSources); + } + + protected static class BigQueryReader extends BoundedSource.BoundedReader { + private final BigQuerySourceBase source; + private final BigQueryServices.BigQueryJsonReader reader; + + private BigQueryReader( + BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) { + this.source = source; + this.reader = reader; + } + + @Override + public BoundedSource getCurrentSource() { + return source; + } + + @Override + public boolean start() throws IOException { + return reader.start(); + } + + @Override + public boolean advance() throws IOException { + return reader.advance(); + } + + @Override + public TableRow getCurrent() throws NoSuchElementException { + return reader.getCurrent(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + } + } + + /** + * A {@link BoundedSource} that reads from {@code BoundedSource} + * and transforms elements to type {@code V}. + */ + @VisibleForTesting + static class TransformingSource extends BoundedSource { + private final BoundedSource boundedSource; + private final SerializableFunction function; + private final Coder outputCoder; + + TransformingSource( + BoundedSource boundedSource, + SerializableFunction function, + Coder outputCoder) { + this.boundedSource = boundedSource; + this.function = function; + this.outputCoder = outputCoder; + } + + @Override + public List> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + return Lists.transform( + boundedSource.splitIntoBundles(desiredBundleSizeBytes, options), + new Function, BoundedSource>() { + @Override + public BoundedSource apply(BoundedSource input) { + return new TransformingSource<>(input, function, outputCoder); + } + }); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return boundedSource.getEstimatedSizeBytes(options); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return boundedSource.producesSortedKeys(options); + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + return new TransformingReader(boundedSource.createReader(options)); + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public Coder getDefaultOutputCoder() { + return outputCoder; + } + + private class TransformingReader extends BoundedReader { + private final BoundedReader boundedReader; + + private TransformingReader(BoundedReader boundedReader) { + this.boundedReader = boundedReader; + } + + @Override + public synchronized BoundedSource getCurrentSource() { + return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder); + } + + @Override + public boolean start() throws IOException { + return boundedReader.start(); + } + + @Override + public boolean advance() throws IOException { + return boundedReader.advance(); + } + + @Override + public V getCurrent() throws NoSuchElementException { + T current = boundedReader.getCurrent(); + return function.apply(current); + } + + @Override + public void close() throws IOException { + boundedReader.close(); + } + + @Override + public synchronized BoundedSource splitAtFraction(double fraction) { + return new TransformingSource<>( + boundedReader.splitAtFraction(fraction), function, outputCoder); + } + + @Override + public Double getFractionConsumed() { + return boundedReader.getFractionConsumed(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return boundedReader.getCurrentTimestamp(); + } + } + } + + private static String getExtractJobId(String jobIdToken) { + return jobIdToken + "-extract"; + } + + private static String getExtractDestinationUri(String extractDestinationDir) { + return String.format("%s/%s", extractDestinationDir, "*.avro"); + } + + private static List getExtractFilePaths(String extractDestinationDir, Job extractJob) + throws IOException { + JobStatistics jobStats = extractJob.getStatistics(); + List counts = jobStats.getExtract().getDestinationUriFileCounts(); + if (counts.size() != 1) { + String errorMessage = (counts.size() == 0 ? + "No destination uri file count received." : + String.format("More than one destination uri file count received. First two are %s, %s", + counts.get(0), counts.get(1))); + throw new RuntimeException(errorMessage); + } + long filesCount = counts.get(0); + + ImmutableList.Builder paths = ImmutableList.builder(); + IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); + for (long i = 0; i < filesCount; ++i) { + String filePath = + factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro")); + paths.add(filePath); + } + return paths.build(); + } + ///////////////////////////////////////////////////////////////////////////// /** @@ -797,8 +1463,15 @@ public TableReference apply(BoundedWindow value) { */ @Deprecated public Bound() { - this(null, null, null, null, CreateDisposition.CREATE_IF_NEEDED, - WriteDisposition.WRITE_EMPTY, true, null); + this( + null /* name */, + null /* jsonTableRef */, + null /* tableRefFunction */, + null /* jsonSchema */, + CreateDisposition.CREATE_IF_NEEDED, + WriteDisposition.WRITE_EMPTY, + true /* validate */, + null /* testBigQueryServices */); } private Bound(String name, @Nullable String jsonTableRef, @@ -1030,14 +1703,14 @@ public PDone apply(PCollection input) { if (Strings.isNullOrEmpty(table.getProjectId())) { table.setProjectId(options.getProject()); } - String jobIdToken = UUID.randomUUID().toString(); + String jobIdToken = randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; try { IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); tempFilePrefix = factory.resolve( - factory.resolve(tempLocation, "BigQuerySinkTemp"), - jobIdToken); + factory.resolve(tempLocation, "BigQuerySinkTemp"), + jobIdToken); } catch (IOException e) { throw new RuntimeException( String.format("Failed to resolve BigQuery temp location in %s", tempLocation), @@ -1170,7 +1843,7 @@ private static class BigQueryWriteOperation extends FileBasedWriteOperation T fromJsonString(String json, Class clazz) { + @VisibleForTesting + static T fromJsonString(String json, Class clazz) { if (json == null) { return null; } @@ -1723,51 +2401,20 @@ private static T fromJsonString(String json, Class clazz) { } } - ///////////////////////////////////////////////////////////////////////////// - - /** Disallow construction of utility class. */ - private BigQueryIO() {} - /** - * Direct mode read evaluator. + * Returns a randomUUID string. * - *

This loads the entire table into an in-memory PCollection. + *

{@code '-'} is removed because BigQuery doesn't allow it in dataset id. */ - private static void evaluateReadHelper( - Read.Bound transform, DirectPipelineRunner.EvaluationContext context) { - BigQueryOptions options = context.getPipelineOptions(); - Bigquery client = Transport.newBigQueryClient(options).build(); - if (transform.table != null && transform.table.getProjectId() == null) { - transform.table.setProjectId(options.getProject()); - } - - BigQueryTableRowIterator iterator; - if (transform.query != null) { - LOG.info("Reading from BigQuery query {}", transform.query); - iterator = - BigQueryTableRowIterator.fromQuery( - transform.query, options.getProject(), client, transform.getFlattenResults()); - } else { - LOG.info("Reading from BigQuery table {}", toTableSpec(transform.table)); - iterator = BigQueryTableRowIterator.fromTable(transform.table, client); - } - - try (BigQueryTableRowIterator ignored = iterator) { - List elems = new ArrayList<>(); - iterator.open(); - while (iterator.advance()) { - elems.add(iterator.getCurrent()); - } - LOG.info("Number of records read from BigQuery: {}", elems.size()); - context.setPCollection(context.getOutput(transform), elems); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException(e); - } + private static String randomUUIDString() { + return UUID.randomUUID().toString().replaceAll("-", ""); } + ///////////////////////////////////////////////////////////////////////////// + + /** Disallow construction of utility class. */ + private BigQueryIO() {} + private static List getOrCreateMapListValue(Map> map, K key) { List value = map.get(key); if (value == null) { diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java index a78c60d4b4..2fc93fe355 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServices.java @@ -15,14 +15,23 @@ */ package com.google.cloud.dataflow.sdk.util; +import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import java.io.IOException; import java.io.Serializable; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; /** * An interface for real, mock, or fake implementations of Cloud BigQuery services. @@ -34,26 +43,41 @@ public interface BigQueryServices extends Serializable { */ public JobService getJobService(BigQueryOptions bqOptions); + /** + * Returns a real, mock, or fake {@link DatasetService}. + */ + public DatasetService getDatasetService(BigQueryOptions bqOptions); + + /** + * Returns a real, mock, or fake {@link BigQueryJsonReader} to read tables. + */ + public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef); + + /** + * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables. + */ + public BigQueryJsonReader getReaderFromQuery( + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten); + /** * An interface for the Cloud BigQuery load service. */ public interface JobService { /** - * Starts a BigQuery load job. + * Start a BigQuery load job. */ - void startLoadJob(String jobId, JobConfigurationLoad loadConfig) + void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException; - /** * Start a BigQuery extract job. */ - void startExtractJob(String jobId, JobConfigurationExtract extractConfig) + void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException; /** - * Start a BigQuery extract job. + * Start a BigQuery query job. */ - void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) + void startQueryJob(JobReference jobRef, JobConfigurationQuery query) throws IOException, InterruptedException; /** @@ -61,7 +85,78 @@ void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) * *

Returns null if the {@code maxAttempts} retries reached. */ - Job pollJob(String projectId, String jobId, int maxAttempts) + Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException, IOException; + + /** + * Dry runs the query in the given project. + */ + JobStatistics dryRunQuery(String projectId, String query) + throws InterruptedException, IOException; + } + + /** + * An interface to get, create and delete Cloud BigQuery datasets and tables. + */ + public interface DatasetService { + /** + * Gets the specified {@link Table} resource by table ID. + */ + Table getTable(String projectId, String datasetId, String tableId) + throws InterruptedException, IOException; + + /** + * Deletes the table specified by tableId from the dataset. + * If the table contains data, all the data will be deleted. + */ + void deleteTable(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException; + + /** + * Create a {@link Dataset} with the given {@code location} and {@code description}. + */ + void createDataset(String projectId, String datasetId, String location, String description) + throws IOException, InterruptedException; + + /** + * Deletes the dataset specified by the datasetId value. + * + *

Before you can delete a dataset, you must delete all its tables. + */ + void deleteDataset(String projectId, String datasetId) + throws IOException, InterruptedException; + } + + /** + * An interface to read the Cloud BigQuery directly. + */ + public interface BigQueryJsonReader { + /** + * Initializes the reader and advances the reader to the first record. + */ + boolean start() throws IOException; + + /** + * Advances the reader to the next valid record. + */ + boolean advance() throws IOException; + + /** + * Returns the value of the data item that was read by the last {@link #start} or + * {@link #advance} call. The returned value must be effectively immutable and remain valid + * indefinitely. + * + *

Multiple calls to this method without an intervening call to {@link #advance} should + * return the same result. + * + * @throws java.util.NoSuchElementException if {@link #start} was never called, or if + * the last {@link #start} or {@link #advance} returned {@code false}. + */ + TableRow getCurrent() throws NoSuchElementException; + + /** + * Closes the reader. The reader cannot be used after this method is called. + */ + void close() throws IOException; } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java index 1bc62cac7b..e438747785 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImpl.java @@ -16,17 +16,24 @@ package com.google.cloud.dataflow.sdk.util; import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; @@ -35,8 +42,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; /** * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery @@ -44,6 +53,8 @@ */ public class BigQueryServicesImpl implements BigQueryServices { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class); + // The maximum number of attempts to execute a BigQuery RPC. private static final int MAX_RPC_ATTEMPTS = 10; @@ -51,17 +62,31 @@ public class BigQueryServicesImpl implements BigQueryServices { private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); // The initial backoff for polling the status of a BigQuery job. - private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60); + private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); @Override public JobService getJobService(BigQueryOptions options) { return new JobServiceImpl(options); } + @Override + public DatasetService getDatasetService(BigQueryOptions options) { + return new DatasetServiceImpl(options); + } + + @Override + public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) { + return BigQueryJsonReaderImpl.fromTable(bqOptions, tableRef); + } + + @Override + public BigQueryJsonReader getReaderFromQuery( + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) { + return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten); + } + @VisibleForTesting static class JobServiceImpl implements BigQueryServices.JobService { - private static final Logger LOG = LoggerFactory.getLogger(JobServiceImpl.class); - private final ApiErrorExtractor errorExtractor; private final Bigquery client; @@ -79,22 +104,17 @@ private JobServiceImpl(BigQueryOptions options) { /** * {@inheritDoc} * - *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC retries. + * @throws IOException if it exceeds max RPC . */ @Override public void startLoadJob( - String jobId, + JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException { - Job job = new Job(); - JobReference jobRef = new JobReference(); - jobRef.setProjectId(loadConfig.getDestinationTable().getProjectId()); - jobRef.setJobId(jobId); - job.setJobReference(jobRef); - JobConfiguration jobConfig = new JobConfiguration(); - jobConfig.setLoad(loadConfig); - job.setConfiguration(jobConfig); + Job job = new Job() + .setJobReference(jobRef) + .setConfiguration(new JobConfiguration().setLoad(loadConfig)); startJob(job, errorExtractor, client); } @@ -102,21 +122,17 @@ public void startLoadJob( /** * {@inheritDoc} * - *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC retries. + * @throws IOException if it exceeds max RPC . */ @Override - public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) + public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException { - Job job = new Job(); - JobReference jobRef = new JobReference(); - jobRef.setProjectId(extractConfig.getSourceTable().getProjectId()); - jobRef.setJobId(jobId); - job.setJobReference(jobRef); - JobConfiguration jobConfig = new JobConfiguration(); - jobConfig.setExtract(extractConfig); - job.setConfiguration(jobConfig); + Job job = new Job() + .setJobReference(jobRef) + .setConfiguration( + new JobConfiguration().setExtract(extractConfig)); startJob(job, errorExtractor, client); } @@ -124,22 +140,17 @@ public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) /** * {@inheritDoc} * - *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC retries. + * @throws IOException if it exceeds max RPC . */ @Override - public void startQueryJob(String jobId, JobConfigurationQuery queryConfig, boolean dryRun) + public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig) throws IOException, InterruptedException { - Job job = new Job(); - JobReference jobRef = new JobReference(); - jobRef.setProjectId(queryConfig.getDestinationTable().getProjectId()); - jobRef.setJobId(jobId); - job.setJobReference(jobRef); - JobConfiguration jobConfig = new JobConfiguration(); - jobConfig.setQuery(queryConfig); - jobConfig.setDryRun(dryRun); - job.setConfiguration(jobConfig); + Job job = new Job() + .setJobReference(jobRef) + .setConfiguration( + new JobConfiguration().setQuery(queryConfig)); startJob(job, errorExtractor, client); } @@ -158,8 +169,7 @@ static void startJob( ApiErrorExtractor errorExtractor, Bigquery client, Sleeper sleeper, - BackOff backoff) - throws InterruptedException, IOException { + BackOff backoff) throws IOException, InterruptedException { JobReference jobRef = job.getJobReference(); Exception lastException = null; do { @@ -181,28 +191,27 @@ static void startJob( } while (nextBackOff(sleeper, backoff)); throw new IOException( String.format( - "Unable to insert job: %s, aborting after %d retries.", + "Unable to insert job: %s, aborting after %d .", jobRef.getJobId(), MAX_RPC_ATTEMPTS), lastException); } @Override - public Job pollJob(String projectId, String jobId, int maxAttempts) + public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { BackOff backoff = new AttemptBoundedExponentialBackOff( maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS); - return pollJob(projectId, jobId, Sleeper.DEFAULT, backoff); + return pollJob(jobRef, Sleeper.DEFAULT, backoff); } @VisibleForTesting Job pollJob( - String projectId, - String jobId, + JobReference jobRef, Sleeper sleeper, BackOff backoff) throws InterruptedException { do { try { - Job job = client.jobs().get(projectId, jobId).execute(); + Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute(); JobStatus status = job.getStatus(); if (status != null && status.getState() != null && status.getState().equals("DONE")) { return job; @@ -213,22 +222,254 @@ Job pollJob( LOG.warn("Ignore the error and retry polling job status.", e); } } while (nextBackOff(sleeper, backoff)); - LOG.warn("Unable to poll job status: {}, aborting after reached max retries.", jobId); + LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId()); return null; } + @Override + public JobStatistics dryRunQuery(String projectId, String query) + throws InterruptedException, IOException { + Job job = new Job() + .setConfiguration(new JobConfiguration() + .setQuery(new JobConfigurationQuery() + .setQuery(query)) + .setDryRun(true)); + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + return executeWithRetries( + client.jobs().insert(projectId, job), + String.format( + "Unable to dry run query: %s, aborting after %d retries.", + query, MAX_RPC_ATTEMPTS), + Sleeper.DEFAULT, + backoff).getStatistics(); + } + } + + @VisibleForTesting + static class DatasetServiceImpl implements DatasetService { + private final ApiErrorExtractor errorExtractor; + private final Bigquery client; + + @VisibleForTesting + DatasetServiceImpl(Bigquery client) { + this.errorExtractor = new ApiErrorExtractor(); + this.client = client; + } + + private DatasetServiceImpl(BigQueryOptions bqOptions) { + this.errorExtractor = new ApiErrorExtractor(); + this.client = Transport.newBigQueryClient(bqOptions).build(); + } + /** - * Identical to {@link BackOffUtils#next} but without checked IOException. - * @throws InterruptedException + * {@inheritDoc} + * + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC . */ - private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) - throws InterruptedException { + @Override + public Table getTable(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException { + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + return executeWithRetries( + client.tables().get(projectId, datasetId, tableId), + String.format( + "Unable to get table: %s, aborting after %d retries.", + tableId, MAX_RPC_ATTEMPTS), + Sleeper.DEFAULT, + backoff); + } + + /** + * {@inheritDoc} + * + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC . + */ + @Override + public void deleteTable(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException { + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + executeWithRetries( + client.tables().delete(projectId, datasetId, tableId), + String.format( + "Unable to delete table: %s, aborting after %d retries.", + tableId, MAX_RPC_ATTEMPTS), + Sleeper.DEFAULT, + backoff); + } + + /** + * {@inheritDoc} + * + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC . + */ + @Override + public void createDataset( + String projectId, String datasetId, String location, String description) + throws IOException, InterruptedException { + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff); + } + + @VisibleForTesting + void createDataset( + String projectId, + String datasetId, + String location, + String description, + Sleeper sleeper, + BackOff backoff) throws IOException, InterruptedException { + DatasetReference datasetRef = new DatasetReference() + .setProjectId(projectId) + .setDatasetId(datasetId); + + Dataset dataset = new Dataset() + .setDatasetReference(datasetRef) + .setLocation(location) + .setFriendlyName(location) + .setDescription(description); + + Exception lastException; + do { + try { + client.datasets().insert(projectId, dataset).execute(); + return; // SUCCEEDED + } catch (GoogleJsonResponseException e) { + if (errorExtractor.itemAlreadyExists(e)) { + return; // SUCCEEDED + } + // ignore and retry + LOG.warn("Ignore the error and retry creating the dataset.", e); + lastException = e; + } catch (IOException e) { + LOG.warn("Ignore the error and retry creating the dataset.", e); + lastException = e; + } + } while (nextBackOff(sleeper, backoff)); + throw new IOException( + String.format( + "Unable to create dataset: %s, aborting after %d .", + datasetId, MAX_RPC_ATTEMPTS), + lastException); + } + + /** + * {@inheritDoc} + * + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC . + */ + @Override + public void deleteDataset(String projectId, String datasetId) + throws IOException, InterruptedException { + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + executeWithRetries( + client.datasets().delete(projectId, datasetId), + String.format( + "Unable to delete table: %s, aborting after %d retries.", + datasetId, MAX_RPC_ATTEMPTS), + Sleeper.DEFAULT, + backoff); + } + } + + private static class BigQueryJsonReaderImpl implements BigQueryJsonReader { + BigQueryTableRowIterator iterator; + + private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) { + this.iterator = iterator; + } + + private static BigQueryJsonReader fromQuery( + BigQueryOptions bqOptions, + String query, + String projectId, + @Nullable Boolean flattenResults) { + return new BigQueryJsonReaderImpl( + BigQueryTableRowIterator.fromQuery( + query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults)); + } + + private static BigQueryJsonReader fromTable( + BigQueryOptions bqOptions, + TableReference tableRef) { + return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable( + tableRef, Transport.newBigQueryClient(bqOptions).build())); + } + + @Override + public boolean start() throws IOException { try { - return BackOffUtils.next(sleeper, backoff); + iterator.open(); + return iterator.advance(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during start() operation", e); + } + } + + @Override + public boolean advance() throws IOException { + try { + return iterator.advance(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during advance() operation", e); + } + } + + @Override + public TableRow getCurrent() throws NoSuchElementException { + return iterator.getCurrent(); + } + + @Override + public void close() throws IOException { + iterator.close(); + } + } + + @VisibleForTesting + static T executeWithRetries( + AbstractGoogleClientRequest request, + String errorMessage, + Sleeper sleeper, + BackOff backoff) + throws IOException, InterruptedException { + Exception lastException = null; + do { + try { + return request.execute(); } catch (IOException e) { - throw new RuntimeException(e); + LOG.warn("Ignore the error and retry the request.", e); + lastException = e; } + } while (nextBackOff(sleeper, backoff)); + throw new IOException( + errorMessage, + lastException); + } + + /** + * Identical to {@link BackOffUtils#next} but without checked IOException. + * @throws InterruptedException + */ + private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException { + try { + return BackOffUtils.next(sleeper, backoff); + } catch (IOException e) { + throw new RuntimeException(e); } } } - diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index 4c754d50d7..54066ff7da 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -16,12 +16,17 @@ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.io.BigQueryIO.fromJsonString; +import static com.google.cloud.dataflow.sdk.io.BigQueryIO.toJsonString; import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; import com.google.api.client.util.Data; import com.google.api.services.bigquery.model.ErrorProto; @@ -29,31 +34,55 @@ import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; +import com.google.api.services.bigquery.model.JobStatistics2; +import com.google.api.services.bigquery.model.JobStatistics4; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.CoderException; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; +import com.google.cloud.dataflow.sdk.io.BigQueryIO.BigQueryQuerySource; +import com.google.cloud.dataflow.sdk.io.BigQueryIO.BigQueryTableSource; +import com.google.cloud.dataflow.sdk.io.BigQueryIO.PassThroughThenCleanup; +import com.google.cloud.dataflow.sdk.io.BigQueryIO.PassThroughThenCleanup.CleanupOperation; import com.google.cloud.dataflow.sdk.io.BigQueryIO.Status; +import com.google.cloud.dataflow.sdk.io.BigQueryIO.TransformingSource; import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition; import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.SourceTestUtils; +import com.google.cloud.dataflow.sdk.testing.SourceTestUtils.ExpectedSplitOutcome; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.util.BigQueryServices; +import com.google.cloud.dataflow.sdk.util.BigQueryServices.DatasetService; +import com.google.cloud.dataflow.sdk.util.BigQueryServices.JobService; import com.google.cloud.dataflow.sdk.util.CoderUtils; +import com.google.cloud.dataflow.sdk.util.IOChannelFactory; +import com.google.cloud.dataflow.sdk.util.IOChannelUtils; +import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; @@ -65,18 +94,24 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.io.Serializable; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; /** * Tests for BigQueryIO. */ @RunWith(JUnit4.class) -public class BigQueryIOTest { +public class BigQueryIOTest implements Serializable { // Status.UNKNOWN maps to null private static final Map JOB_STATUS_MAP = ImmutableMap.of( @@ -85,111 +120,192 @@ Status.SUCCEEDED, new Job().setStatus(new JobStatus()), private static class FakeBigQueryServices implements BigQueryServices { - private Object[] startJobReturns; - private Object[] pollJobStatusReturns; + private String[] jsonTableRowReturns = new String[0]; + private JobService jobService; + private DatasetService datasetService; - /** - * Sets the return values for the mock {@link JobService#startLoadJob}. - * - *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. - */ - private FakeBigQueryServices startLoadJobReturns(Object... startLoadJobReturns) { - this.startJobReturns = startLoadJobReturns; + public FakeBigQueryServices withJobService(JobService jobService) { + this.jobService = jobService; return this; } - /** - * Sets the return values for the mock {@link JobService#pollJobStatus}. - * - *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. - */ - private FakeBigQueryServices pollJobStatusReturns(Object... pollJobStatusReturns) { - this.pollJobStatusReturns = pollJobStatusReturns; + public FakeBigQueryServices withDatasetService(DatasetService datasetService) { + this.datasetService = datasetService; + return this; + } + + public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) { + this.jsonTableRowReturns = jsonTableRowReturns; return this; } @Override public JobService getJobService(BigQueryOptions bqOptions) { - return new FakeLoadService(startJobReturns, pollJobStatusReturns); + return jobService; } - private static class FakeLoadService implements BigQueryServices.JobService { + @Override + public DatasetService getDatasetService(BigQueryOptions bqOptions) { + return datasetService; + } - private Object[] startJobReturns; - private Object[] pollJobStatusReturns; - private int startLoadJobCallsCount; - private int pollJobStatusCallsCount; + @Override + public BigQueryJsonReader getReaderFromTable( + BigQueryOptions bqOptions, TableReference tableRef) { + return new FakeBigQueryReader(jsonTableRowReturns); + } + + @Override + public BigQueryJsonReader getReaderFromQuery( + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) { + return new FakeBigQueryReader(jsonTableRowReturns); + } + + private static class FakeBigQueryReader implements BigQueryJsonReader { + private static final int UNSTARTED = -1; + private static final int CLOSED = Integer.MAX_VALUE; + + private String[] jsonTableRowReturns; + private int currIndex; - public FakeLoadService(Object[] startLoadJobReturns, Object[] pollJobStatusReturns) { - this.startJobReturns = startLoadJobReturns; - this.pollJobStatusReturns = pollJobStatusReturns; - this.startLoadJobCallsCount = 0; - this.pollJobStatusCallsCount = 0; + FakeBigQueryReader(String[] jsonTableRowReturns) { + this.jsonTableRowReturns = jsonTableRowReturns; + this.currIndex = UNSTARTED; } @Override - public void startLoadJob(String jobId, JobConfigurationLoad loadConfig) - throws InterruptedException, IOException { - startJob(); + public boolean start() throws IOException { + assertEquals(UNSTARTED, currIndex); + currIndex = 0; + return currIndex < jsonTableRowReturns.length; } @Override - public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) - throws InterruptedException, IOException { - startJob(); + public boolean advance() throws IOException { + return ++currIndex < jsonTableRowReturns.length; } @Override - public void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) - throws IOException, InterruptedException { - startJob(); + public TableRow getCurrent() throws NoSuchElementException { + if (currIndex >= jsonTableRowReturns.length) { + throw new NoSuchElementException(); + } + return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class); } @Override - public Job pollJob(String projectId, String jobId, int maxAttemps) - throws InterruptedException { - if (pollJobStatusCallsCount < pollJobStatusReturns.length) { - Object ret = pollJobStatusReturns[pollJobStatusCallsCount++]; - if (ret instanceof Status) { - return JOB_STATUS_MAP.get(ret); - } else if (ret instanceof InterruptedException) { - throw (InterruptedException) ret; - } else { - throw new RuntimeException("Unexpected return type: " + ret.getClass()); - } + public void close() throws IOException { + currIndex = CLOSED; + } + } + } + + private static class FakeJobService implements JobService, Serializable { + + private Object[] startJobReturns; + private Object[] pollJobReturns; + // Both counts will be reset back to zeros after serialization. + // This is a work around for DoFn's verifyUnmodified check. + private transient int startJobCallsCount; + private transient int pollJobStatusCallsCount; + + public FakeJobService() { + this.startJobReturns = new Object[0]; + this.pollJobReturns = new Object[0]; + this.startJobCallsCount = 0; + this.pollJobStatusCallsCount = 0; + } + + /** + * Sets the return values to mock {@link JobService#startLoadJob}, + * {@link JobService#startExtractJob} and {@link JobService#startQueryJob}. + * + *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. + */ + public FakeJobService startJobReturns(Object... startJobReturns) { + this.startJobReturns = startJobReturns; + return this; + } + + /** + * Sets the return values to mock {@link JobService#pollJob}. + * + *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. + */ + public FakeJobService pollJobReturns(Object... pollJobReturns) { + this.pollJobReturns = pollJobReturns; + return this; + } + + @Override + public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) + throws InterruptedException, IOException { + startJob(); + } + + @Override + public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) + throws InterruptedException, IOException { + startJob(); + } + + @Override + public void startQueryJob(JobReference jobRef, JobConfigurationQuery query) + throws IOException, InterruptedException { + startJob(); + } + + @Override + public Job pollJob(JobReference jobRef, int maxAttempts) + throws InterruptedException { + if (pollJobStatusCallsCount < pollJobReturns.length) { + Object ret = pollJobReturns[pollJobStatusCallsCount++]; + if (ret instanceof Job) { + return (Job) ret; + } else if (ret instanceof Status) { + return JOB_STATUS_MAP.get(ret); + } else if (ret instanceof InterruptedException) { + throw (InterruptedException) ret; } else { - throw new RuntimeException( - "Exceeded expected number of calls: " + pollJobStatusReturns.length); + throw new RuntimeException("Unexpected return type: " + ret.getClass()); } + } else { + throw new RuntimeException( + "Exceeded expected number of calls: " + pollJobReturns.length); } + } - private void startJob() throws IOException, InterruptedException { - if (startLoadJobCallsCount < startJobReturns.length) { - Object ret = startJobReturns[startLoadJobCallsCount++]; - if (ret instanceof IOException) { - throw (IOException) ret; - } else if (ret instanceof InterruptedException) { - throw (InterruptedException) ret; - } else { - return; - } + private void startJob() throws IOException, InterruptedException { + if (startJobCallsCount < startJobReturns.length) { + Object ret = startJobReturns[startJobCallsCount++]; + if (ret instanceof IOException) { + throw (IOException) ret; + } else if (ret instanceof InterruptedException) { + throw (InterruptedException) ret; } else { - throw new RuntimeException( - "Exceeded expected number of calls: " + startJobReturns.length); + return; } + } else { + throw new RuntimeException( + "Exceeded expected number of calls: " + startJobReturns.length); } } + + @Override + public JobStatistics dryRunQuery(String projectId, String query) + throws InterruptedException, IOException { + throw new UnsupportedOperationException(); + } } - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule public ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); - @Rule - public TemporaryFolder testFolder = new TemporaryFolder(); - @Mock - public BigQueryServices.JobService mockBqLoadService; + @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); + @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); + @Mock public transient BigQueryServices.JobService mockJobService; + @Mock private transient IOChannelFactory mockIOChannelFactory; + @Mock private transient DatasetService mockDatasetService; - private BigQueryOptions bqOptions; + private transient BigQueryOptions bqOptions; private void checkReadTableObject( BigQueryIO.Read.Bound bound, String project, String dataset, String table) { @@ -203,16 +319,16 @@ private void checkReadQueryObject( private void checkReadTableObjectWithValidate( BigQueryIO.Read.Bound bound, String project, String dataset, String table, boolean validate) { - assertEquals(project, bound.table.getProjectId()); - assertEquals(dataset, bound.table.getDatasetId()); - assertEquals(table, bound.table.getTableId()); + assertEquals(project, bound.getTable().getProjectId()); + assertEquals(dataset, bound.getTable().getDatasetId()); + assertEquals(table, bound.getTable().getTableId()); assertNull(bound.query); assertEquals(validate, bound.getValidate()); } private void checkReadQueryObjectWithValidate( BigQueryIO.Read.Bound bound, String query, boolean validate) { - assertNull(bound.table); + assertNull(bound.getTable()); assertEquals(query, bound.query); assertEquals(validate, bound.getValidate()); } @@ -239,10 +355,10 @@ private void checkWriteObjectWithValidate( } @Before - public void setUp() { + public void setUp() throws IOException { bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); bqOptions.setProject("defaultProject"); - bqOptions.setTempLocation(testFolder.getRoot().getAbsolutePath() + "/BigQueryIOTest/"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); MockitoAnnotations.initMocks(this); } @@ -313,11 +429,7 @@ public void testValidateReadSetsDefaultProject() { thrown.expectMessage( Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); - try { - p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef)); - } finally { - Assert.assertEquals("someproject", tableRef.getProjectId()); - } + p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef)); } @Test @@ -361,11 +473,41 @@ public void testBuildSourceWithTableAndFlatten() { p.run(); } + @Test + public void testReadFromTable() { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService() + .startJobReturns("done", "done") + .pollJobReturns(Status.UNKNOWN)) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", 1)), + toJsonString(new TableRow().set("name", "b").set("number", 2)), + toJsonString(new TableRow().set("name", "c").set("number", 3))); + + Pipeline p = TestPipeline.create(bqOptions); + PCollection output = p + .apply(BigQueryIO.Read.from("foo.com:project:somedataset.sometable") + .withTestServices(fakeBqServices) + .withoutValidation()) + .apply(ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output((String) c.element().get("name")); + } + })); + + DataflowAssert.that(output) + .containsInAnyOrder(ImmutableList.of("a", "b", "c")); + + p.run(); + } + @Test public void testCustomSink() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .startLoadJobReturns("done", "done", "done") - .pollJobStatusReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED); + .withJobService(new FakeJobService() + .startJobReturns("done", "done", "done") + .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -396,8 +538,9 @@ public boolean accept(File pathname) { @Test public void testCustomSinkUnknown() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .startLoadJobReturns("done", "done") - .pollJobStatusReturns(Status.FAILED, Status.UNKNOWN); + .withJobService(new FakeJobService() + .startJobReturns("done", "done") + .pollJobReturns(Status.FAILED, Status.UNKNOWN)); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -715,4 +858,239 @@ public void testWriteValidateFailsNoTableAndNoTableSpec() { .apply(Create.of()) .apply(BigQueryIO.Write.named("name")); } + + @Test + public void testBigQueryTableSourceThroughJsonAPI() throws Exception { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + String extractDestinationDir = "mock://tempLocation"; + BoundedSource bqSource = + BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices); + + List expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + PipelineOptions options = PipelineOptionsFactory.create(); + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + } + + @Test + public void testBigQueryTableSourceInitSplit() throws Exception { + Job extractJob = new Job(); + JobStatistics jobStats = new JobStatistics(); + JobStatistics4 extractStats = new JobStatistics4(); + extractStats.setDestinationUriFileCounts(ImmutableList.of(1L)); + jobStats.setExtract(extractStats); + extractJob.setStatus(new JobStatus()) + .setStatistics(jobStats); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .withDatasetService(mockDatasetService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + String extractDestinationDir = "mock://tempLocation"; + BoundedSource bqSource = + BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices); + + List expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + when(mockJobService.pollJob(Mockito.any(), Mockito.anyInt())) + .thenReturn(extractJob); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation("mock://tempLocation"); + + IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + when(mockIOChannelFactory.resolve(anyString(), anyString())) + .thenReturn("mock://tempLocation/output"); + when(mockDatasetService.getTable(anyString(), anyString(), anyString())) + .thenReturn(new Table().setSchema(new TableSchema())); + + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + + List> sources = bqSource.splitIntoBundles(100, options); + assertEquals(1, sources.size()); + BoundedSource actual = sources.get(0); + assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); + + Mockito.verify(mockJobService) + .startExtractJob(Mockito.any(), Mockito.any()); + } + + @Test + public void testBigQueryQuerySourceInitSplit() throws Exception { + TableReference dryRunTable = new TableReference(); + + Job queryJob = new Job(); + JobStatistics queryJobStats = new JobStatistics(); + JobStatistics2 queryStats = new JobStatistics2(); + queryStats.setReferencedTables(ImmutableList.of(dryRunTable)); + queryJobStats.setQuery(queryStats); + queryJob.setStatus(new JobStatus()) + .setStatistics(queryJobStats); + + Job extractJob = new Job(); + JobStatistics extractJobStats = new JobStatistics(); + JobStatistics4 extractStats = new JobStatistics4(); + extractStats.setDestinationUriFileCounts(ImmutableList.of(1L)); + extractJobStats.setExtract(extractStats); + extractJob.setStatus(new JobStatus()) + .setStatistics(extractJobStats); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .withDatasetService(mockDatasetService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String extractDestinationDir = "mock://tempLocation"; + TableReference destinationTable = BigQueryIO.parseTableSpec("project.data_set.table_name"); + String jsonDestinationTable = toJsonString(destinationTable); + BoundedSource bqSource = BigQueryQuerySource.create( + jobIdToken, "query", jsonDestinationTable, true /* flattenResults */, + extractDestinationDir, fakeBqServices); + + List expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation(extractDestinationDir); + + TableReference queryTable = new TableReference() + .setProjectId("testProejct") + .setDatasetId("testDataset") + .setTableId("testTable"); + when(mockJobService.dryRunQuery(anyString(), anyString())) + .thenReturn(new JobStatistics().setQuery( + new JobStatistics2() + .setTotalBytesProcessed(100L) + .setReferencedTables(ImmutableList.of(queryTable)))); + when(mockDatasetService.getTable( + eq(queryTable.getProjectId()), eq(queryTable.getDatasetId()), eq(queryTable.getTableId()))) + .thenReturn(new Table().setSchema(new TableSchema())); + when(mockDatasetService.getTable( + eq(destinationTable.getProjectId()), + eq(destinationTable.getDatasetId()), + eq(destinationTable.getTableId()))) + .thenReturn(new Table().setSchema(new TableSchema())); + IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + when(mockIOChannelFactory.resolve(anyString(), anyString())) + .thenReturn("mock://tempLocation/output"); + when(mockJobService.pollJob(Mockito.any(), Mockito.anyInt())) + .thenReturn(extractJob); + + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + + List> sources = bqSource.splitIntoBundles(100, options); + assertEquals(1, sources.size()); + BoundedSource actual = sources.get(0); + assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); + + Mockito.verify(mockJobService) + .startQueryJob( + Mockito.any(), Mockito.any()); + Mockito.verify(mockJobService) + .startExtractJob(Mockito.any(), Mockito.any()); + Mockito.verify(mockDatasetService) + .createDataset(anyString(), anyString(), anyString(), anyString()); + } + + @Test + public void testTransformingSource() throws Exception { + int numElements = 10000; + @SuppressWarnings("deprecation") + BoundedSource longSource = CountingSource.upTo(numElements); + SerializableFunction toStringFn = + new SerializableFunction() { + @Override + public String apply(Long input) { + return input.toString(); + }}; + BoundedSource stringSource = new TransformingSource<>( + longSource, toStringFn, StringUtf8Coder.of()); + + List expected = Lists.newArrayList(); + for (int i = 0; i < numElements; i++) { + expected.add(String.valueOf(i)); + } + + PipelineOptions options = PipelineOptionsFactory.create(); + Assert.assertThat( + SourceTestUtils.readFromSource(stringSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options); + + SourceTestUtils.assertSourcesEqualReferenceSource( + stringSource, stringSource.splitIntoBundles(100, options), options); + } + + @Test + @Category(RunnableOnService.class) + public void testPassThroughThenCleanup() throws Exception { + Pipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of(1, 2, 3)) + .apply(new PassThroughThenCleanup(new CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + // no-op + }})); + + DataflowAssert.that(output).containsInAnyOrder(1, 2, 3); + + p.run(); + } + + @Test + public void testPassThroughThenCleanupExecuted() throws Exception { + Pipeline p = TestPipeline.create(); + + p.apply(Create.of()) + .apply(new PassThroughThenCleanup(new CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + throw new RuntimeException("cleanup executed"); + }})); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("cleanup executed"); + + p.run(); + } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java index e9ca0ebadd..d9de2ab5b0 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryServicesImplTest.java @@ -37,6 +37,7 @@ import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; import com.google.cloud.dataflow.sdk.testing.FastNanoClockAndSleeper; @@ -114,6 +115,7 @@ public void testStartLoadJobSucceeds() throws IOException, InterruptedException BackOff backoff = new AttemptBoundedExponentialBackOff( 5 /* attempts */, 1000 /* initialIntervalMillis */); JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff); + verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); verify(response, times(1)).getContentType(); @@ -195,8 +197,10 @@ public void testPollJobSucceeds() throws IOException, InterruptedException { BigQueryServicesImpl.JobServiceImpl jobService = new BigQueryServicesImpl.JobServiceImpl(bigquery); - Job job = - jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); assertEquals(testJob, job); verify(response, times(1)).getStatusCode(); @@ -218,8 +222,10 @@ public void testPollJobFailed() throws IOException, InterruptedException { BigQueryServicesImpl.JobServiceImpl jobService = new BigQueryServicesImpl.JobServiceImpl(bigquery); - Job job = - jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); assertEquals(testJob, job); verify(response, times(1)).getStatusCode(); @@ -241,8 +247,10 @@ public void testPollJobUnknown() throws IOException, InterruptedException { BigQueryServicesImpl.JobServiceImpl jobService = new BigQueryServicesImpl.JobServiceImpl(bigquery); - Job job = - jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.STOP_BACKOFF); assertEquals(null, job); verify(response, times(1)).getStatusCode(); @@ -250,6 +258,26 @@ public void testPollJobUnknown() throws IOException, InterruptedException { verify(response, times(1)).getContentType(); } + @Test + public void testExecuteWithRetries() throws IOException, InterruptedException { + Table testTable = new Table(); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testTable)); + + Table table = BigQueryServicesImpl.executeWithRetries( + bigquery.tables().get("projectId", "datasetId", "tableId"), + "Failed to get table.", + Sleeper.DEFAULT, + BackOff.STOP_BACKOFF); + + assertEquals(testTable, table); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + /** A helper to wrap a {@link GenericJson} object in a content stream. */ private static InputStream toStream(GenericJson content) throws IOException { return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content));