From ca41f6b9091ed5c2bce50d98513249b100d8e88b Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 13 Apr 2016 12:03:15 -0700 Subject: [PATCH 1/5] Implement BigQueryIO.Read as Soure --- .../org/apache/beam/sdk/io/BigQueryIO.java | 779 +++++++++++++++--- .../beam/sdk/util/BigQueryServices.java | 69 +- .../beam/sdk/util/BigQueryServicesImpl.java | 193 ++++- .../apache/beam/sdk/io/BigQueryIOTest.java | 524 ++++++++++-- .../sdk/util/BigQueryServicesImplTest.java | 20 + 5 files changed, 1379 insertions(+), 206 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 77852989dd97..9d1fec57ff94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -31,26 +31,32 @@ import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.BigQueryOptions; import org.apache.beam.sdk.options.GcpOptions; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.runners.DirectPipelineRunner; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; +import org.apache.beam.sdk.util.AvroUtils; import org.apache.beam.sdk.util.BigQueryServices; import org.apache.beam.sdk.util.BigQueryServices.JobService; +import org.apache.beam.sdk.util.BigQueryServices.TableService; import org.apache.beam.sdk.util.BigQueryServicesImpl; import org.apache.beam.sdk.util.BigQueryTableInserter; import org.apache.beam.sdk.util.BigQueryTableRowIterator; +import org.apache.beam.sdk.util.GcsUtil.GcsUtilFactory; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.MimeTypes; @@ -58,18 +64,26 @@ import org.apache.beam.sdk.util.Reshuffle; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.Transport; -import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollection.IsBounded; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; import com.google.api.client.json.JsonFactory; +import com.google.api.client.util.BackOff; +import com.google.api.client.util.BackOffUtils; +import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.QueryRequest; import com.google.api.services.bigquery.model.TableReference; @@ -77,31 +91,42 @@ import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; + +import org.apache.avro.generic.GenericRecord; +import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; +import java.io.ObjectInputStream; import java.io.OutputStream; +import java.io.Serializable; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -362,27 +387,29 @@ public static Bound withoutValidation() { * {@link PCollection} of {@link TableRow TableRows}. */ public static class Bound extends PTransform> { - TableReference table; - final String query; + @Nullable final String jsonTableRef; + @Nullable final String query; final boolean validate; - @Nullable - Boolean flattenResults; + @Nullable final Boolean flattenResults; + @Nullable final BigQueryServices testBigQueryServices; private static final String QUERY_VALIDATION_FAILURE_ERROR = "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" + " pipeline, This validation can be disabled using #withoutValidation."; private Bound() { - this(null, null, null, true, null); + this(null, null, null, true, null, null); } - private Bound(String name, String query, TableReference reference, boolean validate, - Boolean flattenResults) { + private Bound( + String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate, + @Nullable Boolean flattenResults, @Nullable BigQueryServices testBigQueryServices) { super(name); - this.table = reference; + this.jsonTableRef = jsonTableRef; this.query = query; this.validate = validate; this.flattenResults = flattenResults; + this.testBigQueryServices = testBigQueryServices; } /** @@ -391,7 +418,7 @@ private Bound(String name, String query, TableReference reference, boolean valid *

Does not modify this object. */ public Bound named(String name) { - return new Bound(name, query, table, validate, flattenResults); + return new Bound(name, query, jsonTableRef, validate, flattenResults, testBigQueryServices); } /** @@ -410,7 +437,8 @@ public Bound from(String tableSpec) { *

Does not modify this object. */ public Bound from(TableReference table) { - return new Bound(name, query, table, validate, flattenResults); + return new Bound( + name, query, toJsonString(table), validate, flattenResults, testBigQueryServices); } /** @@ -424,15 +452,15 @@ public Bound from(TableReference table) { * {@link BigQueryIO.Read.Bound#withoutResultFlattening}. */ public Bound fromQuery(String query) { - return new Bound(name, query, table, validate, - MoreObjects.firstNonNull(flattenResults, Boolean.TRUE)); + return new Bound(name, query, jsonTableRef, validate, + MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), testBigQueryServices); } /** * Disable table validation. */ public Bound withoutValidation() { - return new Bound(name, query, table, false, flattenResults); + return new Bound(name, query, jsonTableRef, false, flattenResults, testBigQueryServices); } /** @@ -443,35 +471,34 @@ public Bound withoutValidation() { * from a table will cause an error during validation. */ public Bound withoutResultFlattening() { - return new Bound(name, query, table, validate, false); + return new Bound(name, query, jsonTableRef, validate, false, testBigQueryServices); + } + + @VisibleForTesting + Bound withTestServices(BigQueryServices testServices) { + return new Bound(name, query, jsonTableRef, validate, flattenResults, testServices); } @Override public void validate(PInput input) { - if (table == null && query == null) { - throw new IllegalStateException( - "Invalid BigQuery read operation, either table reference or query has to be set"); - } else if (table != null && query != null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a" - + " query and a table, only one of these should be provided"); - } else if (table != null && flattenResults != null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" - + " table with a result flattening preference, which is not configurable"); - } else if (query != null && flattenResults == null) { - throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" - + " query without a result flattening preference"); - } - - BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); - if (table != null && table.getProjectId() == null) { - // If user does not specify a project we assume the table to be located in the project - // that owns the Dataflow job. - LOG.warn(String.format(SET_PROJECT_FROM_OPTIONS_WARNING, table.getDatasetId(), - table.getTableId(), bqOptions.getProject())); - table.setProjectId(bqOptions.getProject()); - } - if (validate) { + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + + TableReference table = getTableWithDefaultProject(bqOptions); + if (table == null && query == null) { + throw new IllegalStateException( + "Invalid BigQuery read operation, either table reference or query has to be set"); + } else if (table != null && query != null) { + throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a" + + " query and a table, only one of these should be provided"); + } else if (table != null && flattenResults != null) { + throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" + + " table with a result flattening preference, which is not configurable"); + } else if (query != null && flattenResults == null) { + throw new IllegalStateException("Invalid BigQuery read operation. Specifies a" + + " query without a result flattening preference"); + } + // Check for source table/query presence for early failure notification. // Note that a presence check can fail if the table or dataset are created by earlier // stages of the pipeline or if a query depends on earlier stages of a pipeline. For these @@ -504,14 +531,68 @@ private static void dryRunQuery(BigQueryOptions options, String query) { @Override public PCollection apply(PInput input) { - return PCollection.createPrimitiveOutputInternal( - input.getPipeline(), - WindowingStrategy.globalDefault(), - IsBounded.BOUNDED) - // Force the output's Coder to be what the read is using, and - // unchangeable later, to ensure that we read the input in the - // format specified by the Read transform. - .setCoder(TableRowJsonCoder.of()); + String uuid = UUID.randomUUID().toString(); + String jobIdToken = "beam_job_" + uuid; + String queryTempDatasetId = "temp_dataset_" + uuid; + String queryTempTableId = "temp_table_" + uuid; + + BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); + + BoundedSource source; + BigQueryServices bqServices = getBigQueryServices(); + + final String extractDestinationDir; + String tempLocation = bqOptions.getTempLocation(); + try { + IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation); + extractDestinationDir = factory.resolve(tempLocation, uuid); + } catch (IOException e) { + throw new RuntimeException( + String.format("Failed to resolve extract destination directory in %s", tempLocation)); + } + + if (!Strings.isNullOrEmpty(query)) { + String projectId = bqOptions.getProject(); + TableReference queryTempTableRef = new TableReference() + .setProjectId(projectId) + .setDatasetId(queryTempDatasetId) + .setTableId(queryTempTableId); + + String jsonQueryTempTable; + try { + jsonQueryTempTable = JSON_FACTORY.toString(queryTempTableRef); + } catch (IOException e) { + throw new RuntimeException("Cannot initialize table to JSON strings.", e); + } + source = BigQueryQuerySource.create( + jobIdToken, query, jsonQueryTempTable, flattenResults, + extractDestinationDir, bqServices); + } else { + String jsonTable; + try { + jsonTable = JSON_FACTORY.toString(getTableWithDefaultProject(bqOptions)); + } catch (IOException e) { + throw new RuntimeException("Cannot initialize table to JSON strings.", e); + } + source = BigQueryTableSource.create( + jobIdToken, jsonTable, extractDestinationDir, bqServices); + } + CleanupOperation cleanupOperation = new CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); + Collection dirMatch = factory.match(extractDestinationDir); + if (!dirMatch.isEmpty()) { + Collection extractFiles = factory.match( + factory.resolve(extractDestinationDir, "*")); + new GcsUtilFactory().create(options).remove(extractFiles); + } + } + }; + return input.getPipeline() + .apply(org.apache.beam.sdk.io.Read.from(source)) + .setCoder(getDefaultOutputCoder()) + .apply(new PassThroughThenCleanup(cleanupOperation)); } @Override @@ -522,6 +603,7 @@ protected Coder getDefaultOutputCoder() { @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); + TableReference table = getTable(); if (table != null) { builder.add(DisplayData.item("table", toTableSpec(table))); @@ -533,22 +615,26 @@ public void populateDisplayData(DisplayData.Builder builder) { .addIfNotDefault(DisplayData.item("validation", validate), true); } - static { - DirectPipelineRunner.registerDefaultTransformEvaluator( - Bound.class, new DirectPipelineRunner.TransformEvaluator() { - @Override - public void evaluate( - Bound transform, DirectPipelineRunner.EvaluationContext context) { - evaluateReadHelper(transform, context); - } - }); + /** + * Returns the table to write, or {@code null} if reading from a query instead. + * + *

If the table's project is not specified, use the default one. + */ + @Nullable private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) { + TableReference table = getTable(); + if (table != null && table.getProjectId() == null) { + // If user does not specify a project we assume the table to be located in + // the default project. + table.setProjectId(bqOptions.getProject()); + } + return table; } /** * Returns the table to read, or {@code null} if reading from a query instead. */ public TableReference getTable() { - return table; + return fromJsonString(jsonTableRef, TableReference.class); } /** @@ -571,12 +657,535 @@ public boolean getValidate() { public Boolean getFlattenResults() { return flattenResults; } + + private BigQueryServices getBigQueryServices() { + if (testBigQueryServices != null) { + return testBigQueryServices; + } else { + return new BigQueryServicesImpl(); + } + } } /** Disallow construction of utility class. */ private Read() {} } + /** + * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection} + * has been processed. + */ + static class PassThroughThenCleanup extends PTransform, PCollection> { + + private CleanupOperation cleanupOperation; + + PassThroughThenCleanup(CleanupOperation cleanupOperation) { + this.cleanupOperation = cleanupOperation; + } + + @Override + public PCollection apply(PCollection input) { + TupleTag mainOutput = new TupleTag<>(); + TupleTag cleanupSignal = new TupleTag<>(); + PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn()) + .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal))); + + PCollectionView cleanupSignalView = outputs.get(cleanupSignal) + .setCoder(VoidCoder.of()) + .apply(View.asSingleton().withDefaultValue(null)); + + input.getPipeline() + .apply("Create(CleanupOperation)", Create.of(cleanupOperation)) + .apply("Cleanup", ParDo.of( + new DoFn() { + @Override + public void processElement(ProcessContext c) + throws Exception { + c.element().cleanup(c.getPipelineOptions()); + } + }).withSideInputs(cleanupSignalView)); + + return outputs.get(mainOutput); + } + + private class IdentityFn extends DoFn { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output(c.element()); + } + } + + abstract static class CleanupOperation implements Serializable { + abstract void cleanup(PipelineOptions options) throws Exception; + } + } + + @VisibleForTesting + static class BigQueryTableSource extends BigQuerySourceBase { + + static BigQueryTableSource create( + String jobIdToken, + String jsonTable, + String extractDestinationDir, + BigQueryServices bqServices) { + return new BigQueryTableSource(jobIdToken, jsonTable, extractDestinationDir, bqServices); + } + + private final String jsonTable; + private final AtomicReference tableSizeBytes; + + private BigQueryTableSource( + String jobIdToken, + String jsonTable, + String extractDestinationDir, + BigQueryServices bqServices) { + super(jobIdToken, extractDestinationDir, bqServices); + this.jsonTable = checkNotNull(jsonTable, "jsonTable"); + this.tableSizeBytes = new AtomicReference<>(); + } + + @Override + protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException { + return JSON_FACTORY.fromString(jsonTable, TableReference.class); + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableRef = JSON_FACTORY.fromString(jsonTable, TableReference.class); + return new BigQueryReader(this, bqServices.getReaderFromTable(bqOptions, tableRef)); + } + + @Override + public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + if (tableSizeBytes.get() == null) { + TableReference table = JSON_FACTORY.fromString(jsonTable, TableReference.class); + + Long numBytes = bqServices.getTableService(options.as(BigQueryOptions.class)) + .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId()) + .getNumBytes(); + tableSizeBytes.compareAndSet(null, numBytes); + } + return tableSizeBytes.get(); + } + } + + @VisibleForTesting + static class BigQueryQuerySource extends BigQuerySourceBase { + + static BigQueryQuerySource create( + String jobIdToken, + String query, + String jsonQueryTempTable, + Boolean flattenResults, + String extractDestinationDir, + BigQueryServices bqServices) { + return new BigQueryQuerySource( + jobIdToken, query, jsonQueryTempTable, flattenResults, extractDestinationDir, bqServices); + } + + private final String query; + private final String jsonQueryTempTable; + private final Boolean flattenResults; + private transient AtomicReference dryRunJobRef; + + private BigQueryQuerySource( + String jobIdToken, + String query, + String jsonQueryTempTable, + Boolean flattenResults, + String extractDestinationDir, + BigQueryServices bqServices) { + super(jobIdToken, extractDestinationDir, bqServices); + this.query = checkNotNull(query, "query"); + this.jsonQueryTempTable = checkNotNull(jsonQueryTempTable, "jsonQueryTempTable"); + this.flattenResults = checkNotNull(flattenResults, "flattenResults"); + this.dryRunJobRef = new AtomicReference<>(); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + return dryRunQueryIfNeeded(bqOptions) + .getStatistics().getTotalBytesProcessed(); + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + return new BigQueryReader(this, bqServices.getReaderFromQuery( + bqOptions, query, bqOptions.getProject(), flattenResults)); + } + + @Override + protected TableReference getTableToExtract(BigQueryOptions bqOptions) + throws IOException, InterruptedException { + // 1. Find the location of the query. + TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions) + .getStatistics() + .getQuery() + .getReferencedTables() + .get(0); + TableService tableService = bqServices.getTableService(bqOptions); + String location = tableService.getTable( + dryRunTempTable.getProjectId(), + dryRunTempTable.getDatasetId(), + dryRunTempTable.getTableId()).getLocation(); + + // 2. Create the temporary dataset in the query location. + TableReference tableToExtract = + JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class); + tableService.createDataset( + tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, ""); + + // 3. Execute the query. + String queryJobId = jobIdToken + "-query"; + executeQuery( + queryJobId, query, tableToExtract, flattenResults, bqServices.getJobService(bqOptions)); + return tableToExtract; + } + + private synchronized Job dryRunQueryIfNeeded(BigQueryOptions bqOptions) { + if (dryRunJobRef.get() == null) { + String projectId = bqOptions.getProject(); + try { + String dryRunJobId = jobIdToken + "-dryRunQuery"; + Job dryRunJob = dryRunQuery( + projectId, dryRunJobId, query, flattenResults, bqServices.getJobService(bqOptions)); + dryRunJobRef.compareAndSet(null, dryRunJob); + } catch (IOException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw new RuntimeException("Failed to dry run query.", e); + } + } + return dryRunJobRef.get(); + } + + private static Job dryRunQuery( + String projectId, + String jobId, + String query, + boolean flattenResults, + JobService jobService) throws InterruptedException, IOException { + JobConfigurationQuery queryConfig = new JobConfigurationQuery(); + queryConfig + .setQuery(query) + .setCreateDisposition("CREATE_IF_NEEDED") + .setFlattenResults(flattenResults) + .setPriority("BATCH") + .setWriteDisposition("WRITE_EMPTY"); + jobService.startQueryJob(jobId, queryConfig, true /* dryRun */); + Job job = jobService.pollJob(projectId, jobId, QUERY_DRY_RUN_POLL_MAX_RETRIES); + if (parseStatus(job) != Status.SUCCEEDED) { + throw new IOException("Query job failed: " + jobId); + } + return job; + } + + private static void executeQuery( + String jobId, + String query, + TableReference destinationTable, + boolean flattenResults, + JobService jobService) throws IOException, InterruptedException { + JobConfigurationQuery queryConfig = new JobConfigurationQuery(); + queryConfig + .setQuery(query) + .setAllowLargeResults(true) + .setCreateDisposition("CREATE_IF_NEEDED") + .setDestinationTable(destinationTable) + .setFlattenResults(flattenResults) + .setPriority("BATCH") + .setWriteDisposition("WRITE_EMPTY"); + jobService.startQueryJob(jobId, queryConfig, false /* dryRun */); + Job job = jobService.pollJob( + destinationTable.getProjectId(), jobId, JOB_POLL_MAX_RETRIES); + if (parseStatus(job) != Status.SUCCEEDED) { + throw new IOException("Query job failed: " + jobId); + } + return; + } + + private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { + in.defaultReadObject(); + dryRunJobRef = new AtomicReference<>(); + } + } + + abstract static class BigQuerySourceBase extends BoundedSource { + // The maximum number of attempts to verify temp files. + private static final int MAX_FILES_VERIFY_ATTEMPTS = 10; + + // The maximum number of retries to poll a BigQuery job. + protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; + + // The maximum number of retries to poll a dry run query job. + protected static final int QUERY_DRY_RUN_POLL_MAX_RETRIES = 10; + + // The initial backoff for verifying temp files. + private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); + + protected final String jobIdToken; + protected final String extractDestinationDir; + protected final BigQueryServices bqServices; + + private BigQuerySourceBase( + String jobIdToken, + String extractDestinationDir, + BigQueryServices bqServices) { + this.jobIdToken = checkNotNull(jobIdToken, "jobIdToken"); + this.extractDestinationDir = checkNotNull(extractDestinationDir, "extractDestinationDir"); + this.bqServices = checkNotNull(bqServices, "bqServices"); + } + + @Override + public List> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + TableReference tableToExtract = getTableToExtract(bqOptions); + JobService jobService = bqServices.getJobService(bqOptions); + String extractJobId = jobIdToken + "-extract"; + List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); + + TableSchema tableSchema = bqServices.getTableService(bqOptions).getTable( + tableToExtract.getProjectId(), + tableToExtract.getDatasetId(), + tableToExtract.getTableId()).getSchema(); + + return createSources(tempFiles, tableSchema); + } + + protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) + throws Exception; + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return false; + } + + @Override + public void validate() { + // Do nothing, validation is done in BigQuery.Read. + } + + @Override + public Coder getDefaultOutputCoder() { + return TableRowJsonCoder.of(); + } + + private List executeExtract( + String jobId, TableReference table, JobService jobService) + throws InterruptedException, IOException { + String destinationUri = String.format("%s/%s", extractDestinationDir, "*.avro"); + JobConfigurationExtract extract = new JobConfigurationExtract(); + extract.setSourceTable(table); + extract.setDestinationFormat("AVRO"); + extract.setDestinationUris(ImmutableList.of(destinationUri)); + + LOG.info("Starting BigQuery extract job: {}", jobId); + jobService.startExtractJob(jobId, extract); + Job job = + jobService.pollJob(table.getProjectId(), jobId, JOB_POLL_MAX_RETRIES); + if (parseStatus(job) != Status.SUCCEEDED) { + throw new IOException(String.format( + "Extract job %s failed, status: %s", + job.getJobReference().getJobId(), job.getStatus())); + } + + JobStatistics jobStats = job.getStatistics(); + List counts = jobStats.getExtract().getDestinationUriFileCounts(); + if (counts.size() != 1) { + String errorMessage = (counts.size() == 0 ? + "None destination uri file count received." : + String.format("More than one destination uri file count received. First two are %s, %s", + counts.get(0), counts.get(1))); + throw new RuntimeException(errorMessage); + } + + long filesCount = counts.get(0); + List tempFiles = Lists.newArrayList(); + IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); + for (int i = 0; i < filesCount; ++i) { + String filePath = + factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro")); + tempFiles.add(filePath); + } + return ImmutableList.copyOf(tempFiles); + } + + private List> createSources( + List files, TableSchema tableSchema) throws IOException, InterruptedException { + final String jsonSchema = JSON_FACTORY.toString(tableSchema); + + SerializableFunction function = + new SerializableFunction() { + @Override + public TableRow apply(GenericRecord input) { + try { + return AvroUtils.convertGenericRecordToTableRow( + input, JSON_FACTORY.fromString(jsonSchema, TableSchema.class)); + } catch (IOException e) { + throw new RuntimeException("Failed to convert GenericRecord to TableRow", e); + } + }}; + + List> avroSources = Lists.newArrayList(); + BackOff backoff = new AttemptBoundedExponentialBackOff( + MAX_FILES_VERIFY_ATTEMPTS, INITIAL_FILES_VERIFY_BACKOFF_MILLIS); + for (String fileName : files) { + while (BackOffUtils.next(Sleeper.DEFAULT, backoff)) { + if (IOChannelUtils.getFactory(fileName).getSizeBytes(fileName) != -1) { + break; + } + } + avroSources.add(new TransformingSource<>( + AvroSource.from(fileName), function, getDefaultOutputCoder())); + } + return ImmutableList.copyOf(avroSources); + } + } + + private static class BigQueryReader extends BoundedSource.BoundedReader { + private final BigQuerySourceBase source; + private final BigQueryServices.BigQueryJsonReader reader; + + private BigQueryReader(BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) { + this.source = source; + this.reader = reader; + } + + @Override + public BoundedSource getCurrentSource() { + return source; + } + + @Override + public boolean start() throws IOException { + return reader.start(); + } + + @Override + public boolean advance() throws IOException { + return reader.advance(); + } + + @Override + public TableRow getCurrent() throws NoSuchElementException { + return reader.getCurrent(); + } + + @Override + public void close() throws IOException { + reader.close(); + } + } + + @VisibleForTesting + static class TransformingSource extends BoundedSource { + private final BoundedSource boundedSource; + private final SerializableFunction function; + private final Coder outputCoder; + + TransformingSource( + BoundedSource boundedSource, + SerializableFunction function, + Coder outputCoder) { + this.boundedSource = boundedSource; + this.function = function; + this.outputCoder = outputCoder; + } + + @Override + public List> splitIntoBundles( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + return Lists.transform( + boundedSource.splitIntoBundles(desiredBundleSizeBytes, options), + new Function, BoundedSource>() { + @Override + public BoundedSource apply(BoundedSource input) { + return new TransformingSource<>(input, function, outputCoder); + } + }); + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return boundedSource.getEstimatedSizeBytes(options); + } + + @Override + public boolean producesSortedKeys(PipelineOptions options) throws Exception { + return boundedSource.producesSortedKeys(options); + } + + @Override + public BoundedReader createReader(PipelineOptions options) throws IOException { + return new TransformingReader(boundedSource.createReader(options)); + } + + @Override + public void validate() { + boundedSource.validate(); + } + + @Override + public Coder getDefaultOutputCoder() { + return outputCoder; + } + + private class TransformingReader extends BoundedReader { + private final BoundedReader boundedReader; + + private TransformingReader(BoundedReader boundedReader) { + this.boundedReader = boundedReader; + } + + @Override + public synchronized BoundedSource getCurrentSource() { + return new TransformingSource<>(boundedReader.getCurrentSource(), function, outputCoder); + } + + @Override + public boolean start() throws IOException { + return boundedReader.start(); + } + + @Override + public boolean advance() throws IOException { + return boundedReader.advance(); + } + + @Override + public V getCurrent() throws NoSuchElementException { + T current = boundedReader.getCurrent(); + return function.apply(current); + } + + @Override + public void close() throws IOException { + boundedReader.close(); + } + + @Override + public synchronized BoundedSource splitAtFraction(double fraction) { + return new TransformingSource<>( + boundedReader.splitAtFraction(fraction), function, outputCoder); + } + + @Override + public Double getFractionConsumed() { + return boundedReader.getFractionConsumed(); + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return boundedReader.getCurrentTimestamp(); + } + } + } + ///////////////////////////////////////////////////////////////////////////// /** @@ -1172,7 +1781,7 @@ private static class BigQueryWriteOperation extends FileBasedWriteOperation T fromJsonString(String json, Class clazz) { + @VisibleForTesting + static T fromJsonString(String json, Class clazz) { if (json == null) { return null; } @@ -1730,46 +2341,6 @@ private static T fromJsonString(String json, Class clazz) { /** Disallow construction of utility class. */ private BigQueryIO() {} - /** - * Direct mode read evaluator. - * - *

This loads the entire table into an in-memory PCollection. - */ - private static void evaluateReadHelper( - Read.Bound transform, DirectPipelineRunner.EvaluationContext context) { - BigQueryOptions options = context.getPipelineOptions(); - Bigquery client = Transport.newBigQueryClient(options).build(); - if (transform.table != null && transform.table.getProjectId() == null) { - transform.table.setProjectId(options.getProject()); - } - - BigQueryTableRowIterator iterator; - if (transform.query != null) { - LOG.info("Reading from BigQuery query {}", transform.query); - iterator = - BigQueryTableRowIterator.fromQuery( - transform.query, options.getProject(), client, transform.getFlattenResults()); - } else { - LOG.info("Reading from BigQuery table {}", toTableSpec(transform.table)); - iterator = BigQueryTableRowIterator.fromTable(transform.table, client); - } - - try (BigQueryTableRowIterator ignored = iterator) { - List elems = new ArrayList<>(); - iterator.open(); - while (iterator.advance()) { - elems.add(iterator.getCurrent()); - } - LOG.info("Number of records read from BigQuery: {}", elems.size()); - context.setPCollection(context.getOutput(transform), elems); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException(e); - } - } - private static List getOrCreateMapListValue(Map> map, K key) { List value = map.get(key); if (value == null) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java index b12e049a4dab..3ae093c94f37 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java @@ -19,13 +19,20 @@ import org.apache.beam.sdk.options.BigQueryOptions; +import com.google.api.services.bigquery.model.Dataset; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; import java.io.IOException; import java.io.Serializable; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; /** * An interface for real, mock, or fake implementations of Cloud BigQuery services. @@ -37,16 +44,25 @@ public interface BigQueryServices extends Serializable { */ public JobService getJobService(BigQueryOptions bqOptions); + /** + * Returns a real, mock, or fake {@link TableService}. + */ + public TableService getTableService(BigQueryOptions bqOptions); + + public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef); + + public BigQueryJsonReader getReaderFromQuery( + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten); + /** * An interface for the Cloud BigQuery load service. */ public interface JobService { /** - * Starts a BigQuery load job. + * Start a BigQuery load job. */ void startLoadJob(String jobId, JobConfigurationLoad loadConfig) throws InterruptedException, IOException; - /** * Start a BigQuery extract job. */ @@ -54,7 +70,7 @@ void startExtractJob(String jobId, JobConfigurationExtract extractConfig) throws InterruptedException, IOException; /** - * Start a BigQuery extract job. + * Start a BigQuery query job. */ void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) throws IOException, InterruptedException; @@ -67,4 +83,51 @@ void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) Job pollJob(String projectId, String jobId, int maxAttempts) throws InterruptedException, IOException; } + + /** + * An interface for the Cloud BigQuery table service. + */ + public interface TableService { + /** + * Gets the specified table resource by table ID. + */ + Table getTable(String projectId, String datasetId, String tableId) + throws InterruptedException, IOException; + + Dataset createDataset(String projectId, String datasetId, String location, String description) + throws IOException, InterruptedException; + } + + /** + * An interface to read the Cloud BigQuery directly. + */ + public interface BigQueryJsonReader { + /** + * Initializes the reader and advances the reader to the first record. + */ + boolean start() throws IOException; + + /** + * Advances the reader to the next valid record. + */ + boolean advance() throws IOException; + + /** + * Returns the value of the data item that was read by the last {@link #start} or + * {@link #advance} call. The returned value must be effectively immutable and remain valid + * indefinitely. + * + *

Multiple calls to this method without an intervening call to {@link #advance} should + * return the same result. + * + * @throws java.util.NoSuchElementException if {@link #start} was never called, or if + * the last {@link #start} or {@link #advance} returned {@code false}. + */ + TableRow getCurrent() throws NoSuchElementException; + + /** + * Closes the reader. The reader cannot be used after this method is called. + */ + void close() throws IOException; + } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java index 2bfe84f75452..a62ae10cde4c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java @@ -24,6 +24,8 @@ import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.Dataset; +import com.google.api.services.bigquery.model.DatasetReference; import com.google.api.services.bigquery.model.Job; import com.google.api.services.bigquery.model.JobConfiguration; import com.google.api.services.bigquery.model.JobConfigurationExtract; @@ -31,6 +33,9 @@ import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.annotations.VisibleForTesting; @@ -38,14 +43,19 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + /** * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery * service. */ public class BigQueryServicesImpl implements BigQueryServices { + private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class); + // The maximum number of attempts to execute a BigQuery RPC. private static final int MAX_RPC_ATTEMPTS = 10; @@ -53,17 +63,31 @@ public class BigQueryServicesImpl implements BigQueryServices { private static final long INITIAL_RPC_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); // The initial backoff for polling the status of a BigQuery job. - private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(60); + private static final long INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); @Override public JobService getJobService(BigQueryOptions options) { return new JobServiceImpl(options); } + @Override + public TableService getTableService(BigQueryOptions options) { + return new TableServiceImpl(options); + } + + @Override + public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef) { + return BigQueryJsonReaderImpl.fromTable(bqOptions, tableRef); + } + + @Override + public BigQueryJsonReader getReaderFromQuery( + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) { + return BigQueryJsonReaderImpl.fromQuery(bqOptions, query, projectId, flatten); + } + @VisibleForTesting static class JobServiceImpl implements BigQueryServices.JobService { - private static final Logger LOG = LoggerFactory.getLogger(JobServiceImpl.class); - private final ApiErrorExtractor errorExtractor; private final Bigquery client; @@ -160,8 +184,7 @@ static void startJob( ApiErrorExtractor errorExtractor, Bigquery client, Sleeper sleeper, - BackOff backoff) - throws InterruptedException, IOException { + BackOff backoff) throws IOException, InterruptedException { JobReference jobRef = job.getJobReference(); Exception lastException = null; do { @@ -218,18 +241,162 @@ Job pollJob( LOG.warn("Unable to poll job status: {}, aborting after reached max retries.", jobId); return null; } + } - /** - * Identical to {@link BackOffUtils#next} but without checked IOException. - * @throws InterruptedException - */ - private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) - throws InterruptedException { + @VisibleForTesting + static class TableServiceImpl implements TableService { + private final Bigquery client; + + @VisibleForTesting + TableServiceImpl(Bigquery client) { + this.client = client; + } + + private TableServiceImpl(BigQueryOptions bqOptions) { + this.client = Transport.newBigQueryClient(bqOptions).build(); + } + + @Override + public Table getTable(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException { + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + return getTable(projectId, datasetId, tableId, Sleeper.DEFAULT, backoff); + } + + @VisibleForTesting + Table getTable( + String projectId, + String datasetId, + String tableId, + Sleeper sleeper, + BackOff backoff) + throws IOException, InterruptedException { + Exception lastException = null; + do { + try { + return client.tables().get(projectId, datasetId, tableId).execute(); + } catch (IOException e) { + LOG.warn("Ignore the error and retry getting the table.", e); + lastException = e; + } + } while (nextBackOff(sleeper, backoff)); + throw new IOException( + String.format( + "Unable to get table: %s, aborting after %d retries.", + tableId, MAX_RPC_ATTEMPTS), + lastException); + } + + @Override + public Dataset createDataset( + String projectId, String datasetId, String location, String description) + throws IOException, InterruptedException { + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + return createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff); + } + + @VisibleForTesting + Dataset createDataset( + String projectId, + String datasetId, + String location, + String description, + Sleeper sleeper, + BackOff backoff) throws IOException, InterruptedException { + Exception lastException; + do { + try { + DatasetReference datasetRef = new DatasetReference(); + datasetRef.setProjectId(projectId); + datasetRef.setDatasetId(datasetId); + + Dataset dataset = new Dataset(); + dataset.setDatasetReference(datasetRef); + dataset.setLocation(location); + dataset.setFriendlyName(location); + dataset.setDescription(description); + + return client.datasets().insert(projectId, dataset).execute(); + } catch (IOException e) { + LOG.warn("Ignore the error and retry creating the dataset.", e); + lastException = e; + } + } while (nextBackOff(sleeper, backoff)); + throw new IOException( + String.format( + "Unable to create dataset: %s, aborting after %d retries.", + datasetId, MAX_RPC_ATTEMPTS), + lastException); + } + } + + private static class BigQueryJsonReaderImpl implements BigQueryJsonReader { + BigQueryTableRowIterator iterator; + + private BigQueryJsonReaderImpl(BigQueryTableRowIterator iterator) { + this.iterator = iterator; + } + + private static BigQueryJsonReader fromQuery( + BigQueryOptions bqOptions, + String query, + String projectId, + @Nullable Boolean flattenResults) { + return new BigQueryJsonReaderImpl( + BigQueryTableRowIterator.fromQuery( + query, projectId, Transport.newBigQueryClient(bqOptions).build(), flattenResults)); + } + + private static BigQueryJsonReader fromTable( + BigQueryOptions bqOptions, + TableReference tableRef) { + return new BigQueryJsonReaderImpl(BigQueryTableRowIterator.fromTable( + tableRef, Transport.newBigQueryClient(bqOptions).build())); + } + + @Override + public boolean start() throws IOException { + try { + iterator.open(); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + public boolean advance() throws IOException { try { - return BackOffUtils.next(sleeper, backoff); - } catch (IOException e) { + return iterator.advance(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } } + + @Override + public TableRow getCurrent() throws NoSuchElementException { + return iterator.getCurrent(); + } + + @Override + public void close() throws IOException { + iterator.close(); + } + } + + /** + * Identical to {@link BackOffUtils#next} but without checked IOException. + * @throws InterruptedException + */ + private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException { + try { + return BackOffUtils.next(sleeper, backoff); + } catch (IOException e) { + throw new RuntimeException(e); + } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 7998fc70b7df..c6a3aaae9881 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -17,30 +17,50 @@ */ package org.apache.beam.sdk.io; +import static org.apache.beam.sdk.io.BigQueryIO.fromJsonString; +import static org.apache.beam.sdk.io.BigQueryIO.toJsonString; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.when; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.TableRowJsonCoder; +import org.apache.beam.sdk.io.BigQueryIO.BigQueryQuerySource; +import org.apache.beam.sdk.io.BigQueryIO.BigQueryTableSource; +import org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup; +import org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.BigQueryIO.Status; +import org.apache.beam.sdk.io.BigQueryIO.TransformingSource; import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.BigQueryOptions; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.ExpectedLogs; +import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; +import org.apache.beam.sdk.testing.SourceTestUtils; +import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.BigQueryServices; +import org.apache.beam.sdk.util.BigQueryServices.JobService; +import org.apache.beam.sdk.util.BigQueryServices.TableService; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.IOChannelFactory; +import org.apache.beam.sdk.util.IOChannelUtils; +import org.apache.beam.sdk.values.PCollection; import com.google.api.client.util.Data; import com.google.api.services.bigquery.model.ErrorProto; @@ -48,14 +68,20 @@ import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobStatistics; +import com.google.api.services.bigquery.model.JobStatistics2; +import com.google.api.services.bigquery.model.JobStatistics4; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.hamcrest.CoreMatchers; import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Before; @@ -67,18 +93,24 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import java.io.File; import java.io.FileFilter; import java.io.IOException; +import java.io.Serializable; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; + +import javax.annotation.Nullable; /** * Tests for BigQueryIO. */ @RunWith(JUnit4.class) -public class BigQueryIOTest { +public class BigQueryIOTest implements Serializable { // Status.UNKNOWN maps to null private static final Map JOB_STATUS_MAP = ImmutableMap.of( @@ -87,111 +119,186 @@ Status.SUCCEEDED, new Job().setStatus(new JobStatus()), private static class FakeBigQueryServices implements BigQueryServices { - private Object[] startJobReturns; - private Object[] pollJobStatusReturns; + private String[] jsonTableRowReturns = new String[0]; + private JobService jobService; + private TableService tableService; - /** - * Sets the return values for the mock {@link JobService#startLoadJob}. - * - *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. - */ - private FakeBigQueryServices startLoadJobReturns(Object... startLoadJobReturns) { - this.startJobReturns = startLoadJobReturns; + public FakeBigQueryServices withJobService(JobService jobService) { + this.jobService = jobService; return this; } - /** - * Sets the return values for the mock {@link JobService#pollJobStatus}. - * - *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. - */ - private FakeBigQueryServices pollJobStatusReturns(Object... pollJobStatusReturns) { - this.pollJobStatusReturns = pollJobStatusReturns; + public FakeBigQueryServices withTableService(TableService tableService) { + this.tableService = tableService; + return this; + } + + public FakeBigQueryServices readerReturns(String... jsonTableRowReturns) { + this.jsonTableRowReturns = jsonTableRowReturns; return this; } @Override public JobService getJobService(BigQueryOptions bqOptions) { - return new FakeLoadService(startJobReturns, pollJobStatusReturns); + return jobService; } - private static class FakeLoadService implements BigQueryServices.JobService { + @Override + public TableService getTableService(BigQueryOptions bqOptions) { + return tableService; + } + + @Override + public BigQueryJsonReader getReaderFromTable( + BigQueryOptions bqOptions, TableReference tableRef) { + return new FakeBigQueryReader(jsonTableRowReturns); + } + + @Override + public BigQueryJsonReader getReaderFromQuery( + BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten) { + return new FakeBigQueryReader(jsonTableRowReturns); + } + + private static class FakeBigQueryReader implements BigQueryJsonReader { + private static final int UNSTARTED = -1; + private static final int CLOSED = Integer.MAX_VALUE; - private Object[] startJobReturns; - private Object[] pollJobStatusReturns; - private int startLoadJobCallsCount; - private int pollJobStatusCallsCount; + private String[] jsonTableRowReturns; + private int currIndex; - public FakeLoadService(Object[] startLoadJobReturns, Object[] pollJobStatusReturns) { - this.startJobReturns = startLoadJobReturns; - this.pollJobStatusReturns = pollJobStatusReturns; - this.startLoadJobCallsCount = 0; - this.pollJobStatusCallsCount = 0; + FakeBigQueryReader(String[] jsonTableRowReturns) { + this.jsonTableRowReturns = jsonTableRowReturns; + this.currIndex = UNSTARTED; } @Override - public void startLoadJob(String jobId, JobConfigurationLoad loadConfig) - throws InterruptedException, IOException { - startJob(); + public boolean start() throws IOException { + assertEquals(UNSTARTED, currIndex); + currIndex = 0; + return currIndex < jsonTableRowReturns.length; } @Override - public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) - throws InterruptedException, IOException { - startJob(); + public boolean advance() throws IOException { + return ++currIndex < jsonTableRowReturns.length; } @Override - public void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) - throws IOException, InterruptedException { - startJob(); + public TableRow getCurrent() throws NoSuchElementException { + if (currIndex >= jsonTableRowReturns.length) { + throw new NoSuchElementException(); + } + return fromJsonString(jsonTableRowReturns[currIndex], TableRow.class); } @Override - public Job pollJob(String projectId, String jobId, int maxAttemps) - throws InterruptedException { - if (pollJobStatusCallsCount < pollJobStatusReturns.length) { - Object ret = pollJobStatusReturns[pollJobStatusCallsCount++]; - if (ret instanceof Status) { - return JOB_STATUS_MAP.get(ret); - } else if (ret instanceof InterruptedException) { - throw (InterruptedException) ret; - } else { - throw new RuntimeException("Unexpected return type: " + ret.getClass()); - } + public void close() throws IOException { + currIndex = CLOSED; + } + } + } + + private static class FakeJobService implements JobService, Serializable { + + private Object[] startJobReturns; + private Object[] pollJobReturns; + // Both counts will be reset back to zeros after serialization. + // This is a work around for DoFn's verifyUnmodified check. + private transient int startJobCallsCount; + private transient int pollJobStatusCallsCount; + + public FakeJobService() { + this.startJobReturns = new Object[0]; + this.pollJobReturns = new Object[0]; + this.startJobCallsCount = 0; + this.pollJobStatusCallsCount = 0; + } + + /** + * Sets the return values to mock {@link JobService#startLoadJob}, + * {@link JobService#startExtractJob} and {@link JobService#startQueryJob}. + * + *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. + */ + public FakeJobService startJobReturns(Object... startJobReturns) { + this.startJobReturns = startJobReturns; + return this; + } + + /** + * Sets the return values to mock {@link JobService#pollJob}. + * + *

Throws if the {@link Object} is a {@link Exception}, returns otherwise. + */ + public FakeJobService pollJobReturns(Object... pollJobReturns) { + this.pollJobReturns = pollJobReturns; + return this; + } + + @Override + public void startLoadJob(String jobId, JobConfigurationLoad loadConfig) + throws InterruptedException, IOException { + startJob(); + } + + @Override + public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) + throws InterruptedException, IOException { + startJob(); + } + + @Override + public void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) + throws IOException, InterruptedException { + startJob(); + } + + @Override + public Job pollJob(String projectId, String jobId, int maxAttempts) + throws InterruptedException { + if (pollJobStatusCallsCount < pollJobReturns.length) { + Object ret = pollJobReturns[pollJobStatusCallsCount++]; + if (ret instanceof Job) { + return (Job) ret; + } else if (ret instanceof Status) { + return JOB_STATUS_MAP.get(ret); + } else if (ret instanceof InterruptedException) { + throw (InterruptedException) ret; } else { - throw new RuntimeException( - "Exceeded expected number of calls: " + pollJobStatusReturns.length); + throw new RuntimeException("Unexpected return type: " + ret.getClass()); } + } else { + throw new RuntimeException( + "Exceeded expected number of calls: " + pollJobReturns.length); } + } - private void startJob() throws IOException, InterruptedException { - if (startLoadJobCallsCount < startJobReturns.length) { - Object ret = startJobReturns[startLoadJobCallsCount++]; - if (ret instanceof IOException) { - throw (IOException) ret; - } else if (ret instanceof InterruptedException) { - throw (InterruptedException) ret; - } else { - return; - } + private void startJob() throws IOException, InterruptedException { + if (startJobCallsCount < startJobReturns.length) { + Object ret = startJobReturns[startJobCallsCount++]; + if (ret instanceof IOException) { + throw (IOException) ret; + } else if (ret instanceof InterruptedException) { + throw (InterruptedException) ret; } else { - throw new RuntimeException( - "Exceeded expected number of calls: " + startJobReturns.length); + return; } + } else { + throw new RuntimeException( + "Exceeded expected number of calls: " + startJobReturns.length); } } } - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Rule public ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); - @Rule - public TemporaryFolder testFolder = new TemporaryFolder(); - @Mock - public BigQueryServices.JobService mockBqLoadService; + @Rule public transient ExpectedException thrown = ExpectedException.none(); + @Rule public transient ExpectedLogs logged = ExpectedLogs.none(BigQueryIO.class); + @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); + @Mock public transient BigQueryServices.JobService mockJobService; + @Mock private transient IOChannelFactory mockIOChannelFactory; + @Mock private transient TableService mockTableService; - private BigQueryOptions bqOptions; + private transient BigQueryOptions bqOptions; private void checkReadTableObject( BigQueryIO.Read.Bound bound, String project, String dataset, String table) { @@ -205,16 +312,16 @@ private void checkReadQueryObject( private void checkReadTableObjectWithValidate( BigQueryIO.Read.Bound bound, String project, String dataset, String table, boolean validate) { - assertEquals(project, bound.table.getProjectId()); - assertEquals(dataset, bound.table.getDatasetId()); - assertEquals(table, bound.table.getTableId()); + assertEquals(project, bound.getTable().getProjectId()); + assertEquals(dataset, bound.getTable().getDatasetId()); + assertEquals(table, bound.getTable().getTableId()); assertNull(bound.query); assertEquals(validate, bound.getValidate()); } private void checkReadQueryObjectWithValidate( BigQueryIO.Read.Bound bound, String query, boolean validate) { - assertNull(bound.table); + assertNull(bound.getTable()); assertEquals(query, bound.query); assertEquals(validate, bound.getValidate()); } @@ -241,10 +348,10 @@ private void checkWriteObjectWithValidate( } @Before - public void setUp() { + public void setUp() throws IOException { bqOptions = PipelineOptionsFactory.as(BigQueryOptions.class); bqOptions.setProject("defaultProject"); - bqOptions.setTempLocation(testFolder.getRoot().getAbsolutePath() + "/BigQueryIOTest/"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); MockitoAnnotations.initMocks(this); } @@ -315,11 +422,7 @@ public void testValidateReadSetsDefaultProject() { thrown.expectMessage( Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); - try { - p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef)); - } finally { - Assert.assertEquals("someproject", tableRef.getProjectId()); - } + p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef)); } @Test @@ -363,11 +466,40 @@ public void testBuildSourceWithTableAndFlatten() { p.run(); } + @Test + public void testCustomSource() { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService( + new FakeJobService().startJobReturns("done", "done")) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", 1)), + toJsonString(new TableRow().set("name", "b").set("number", 2)), + toJsonString(new TableRow().set("name", "c").set("number", 3))); + + Pipeline p = TestPipeline.create(bqOptions); + PCollection output = p + .apply(BigQueryIO.Read.from("foo.com:project:somedataset.sometable") + .withTestServices(fakeBqServices) + .withoutValidation()) + .apply(ParDo.of(new DoFn() { + @Override + public void processElement(ProcessContext c) throws Exception { + c.output((String) c.element().get("name")); + } + })); + + PAssert.that(output) + .containsInAnyOrder(ImmutableList.of("a", "b", "c")); + + p.run(); + } + @Test public void testCustomSink() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .startLoadJobReturns("done", "done", "done") - .pollJobStatusReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED); + .withJobService(new FakeJobService() + .startJobReturns("done", "done", "done") + .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -398,8 +530,9 @@ public boolean accept(File pathname) { @Test public void testCustomSinkUnknown() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .startLoadJobReturns("done", "done") - .pollJobStatusReturns(Status.FAILED, Status.UNKNOWN); + .withJobService(new FakeJobService() + .startJobReturns("done", "done") + .pollJobReturns(Status.FAILED, Status.UNKNOWN)); Pipeline p = TestPipeline.create(bqOptions); p.apply(Create.of( @@ -717,4 +850,223 @@ public void testWriteValidateFailsNoTableAndNoTableSpec() { .apply(Create.of()) .apply(BigQueryIO.Write.named("name")); } + + @Test + public void testBigQuerySource() throws Exception { + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + String extractDestinationDir = "mock://tempLocation"; + BoundedSource bqSource = + BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices); + + List expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + PipelineOptions options = PipelineOptionsFactory.create(); + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + } + + @Test + public void testBigQueryTableSourceInitSplit() throws Exception { + Job extractJob = new Job(); + JobStatistics jobStats = new JobStatistics(); + JobStatistics4 extractStats = new JobStatistics4(); + extractStats.setDestinationUriFileCounts(ImmutableList.of(1L)); + jobStats.setExtract(extractStats); + extractJob.setStatus(new JobStatus()) + .setStatistics(jobStats); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .withTableService(mockTableService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + String extractDestinationDir = "mock://tempLocation"; + BoundedSource bqSource = + BigQueryTableSource.create(jobIdToken, jsonTable, extractDestinationDir, fakeBqServices); + + List expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + when(mockJobService.pollJob(anyString(), anyString(), Mockito.anyInt())) + .thenReturn(extractJob); + PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation("mock://tempLocation"); + + IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + when(mockIOChannelFactory.resolve(anyString(), anyString())) + .thenReturn("mock://tempLocation/output"); + when(mockTableService.getTable(anyString(), anyString(), anyString())) + .thenReturn(new Table().setSchema(new TableSchema())); + + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + + List> sources = bqSource.splitIntoBundles(100, options); + assertEquals(1, sources.size()); + BoundedSource actual = sources.get(0); + assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); + + Mockito.verify(mockJobService) + .startExtractJob(anyString(), Mockito.any()); + } + + @Test + public void testBigQueryQuerySourceInitSplit() throws Exception { + TableReference dryRunTable = new TableReference(); + + Job queryJob = new Job(); + JobStatistics queryJobStats = new JobStatistics(); + JobStatistics2 queryStats = new JobStatistics2(); + queryStats.setReferencedTables(ImmutableList.of(dryRunTable)); + queryJobStats.setQuery(queryStats); + queryJob.setStatus(new JobStatus()) + .setStatistics(queryJobStats); + + Job extractJob = new Job(); + JobStatistics extractJobStats = new JobStatistics(); + JobStatistics4 extractStats = new JobStatistics4(); + extractStats.setDestinationUriFileCounts(ImmutableList.of(1L)); + extractJobStats.setExtract(extractStats); + extractJob.setStatus(new JobStatus()) + .setStatistics(extractJobStats); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(mockJobService) + .withTableService(mockTableService) + .readerReturns( + toJsonString(new TableRow().set("name", "a").set("number", "1")), + toJsonString(new TableRow().set("name", "b").set("number", "2")), + toJsonString(new TableRow().set("name", "c").set("number", "3"))); + + String jobIdToken = "testJobIdToken"; + String extractDestinationDir = "mock://tempLocation"; + String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + BoundedSource bqSource = BigQueryQuerySource.create( + jobIdToken, "query", jsonTable, true /* flattenResults */, + extractDestinationDir, fakeBqServices); + + List expected = ImmutableList.of( + new TableRow().set("name", "a").set("number", "1"), + new TableRow().set("name", "b").set("number", "2"), + new TableRow().set("name", "c").set("number", "3")); + + PipelineOptions options = PipelineOptionsFactory.create(); + options.setTempLocation(extractDestinationDir); + + when(mockJobService.pollJob(anyString(), anyString(), Mockito.anyInt())) + .thenReturn(queryJob, extractJob); + IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); + when(mockIOChannelFactory.resolve(anyString(), anyString())) + .thenReturn("mock://tempLocation/output"); + when(mockTableService.getTable(anyString(), anyString(), anyString())) + .thenReturn(new Table().setSchema(new TableSchema())); + + Assert.assertThat( + SourceTestUtils.readFromSource(bqSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + bqSource, 2, 0.3, ExpectedSplitOutcome.MUST_BE_CONSISTENT_IF_SUCCEEDS, options); + + List> sources = bqSource.splitIntoBundles(100, options); + assertEquals(1, sources.size()); + BoundedSource actual = sources.get(0); + assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); + + Mockito.verify(mockJobService) + .startQueryJob(anyString(), Mockito.any(), Mockito.eq(true)); + Mockito.verify(mockJobService) + .startQueryJob(anyString(), Mockito.any(), Mockito.eq(false)); + Mockito.verify(mockJobService) + .startExtractJob(anyString(), Mockito.any()); + } + + @Test + public void testTransformingSource() throws Exception { + int numElements = 10000; + @SuppressWarnings("deprecation") + BoundedSource longSource = CountingSource.upTo(numElements); + SerializableFunction toStringFn = + new SerializableFunction() { + @Override + public String apply(Long input) { + return input.toString(); + }}; + BoundedSource stringSource = new TransformingSource<>( + longSource, toStringFn, StringUtf8Coder.of()); + + List expected = Lists.newArrayList(); + for (int i = 0; i < numElements; i++) { + expected.add(String.valueOf(i)); + } + + PipelineOptions options = PipelineOptionsFactory.create(); + Assert.assertThat( + SourceTestUtils.readFromSource(stringSource, options), + CoreMatchers.is(expected)); + SourceTestUtils.assertSplitAtFractionBehavior( + stringSource, 100, 0.3, ExpectedSplitOutcome.MUST_SUCCEED_AND_BE_CONSISTENT, options); + + SourceTestUtils.assertSourcesEqualReferenceSource( + stringSource, stringSource.splitIntoBundles(100, options), options); + } + + @Test + @Category(RunnableOnService.class) + public void testPassThroughThenCleanup() throws Exception { + Pipeline p = TestPipeline.create(); + + PCollection output = p + .apply(Create.of(1, 2, 3)) + .apply(new PassThroughThenCleanup(new CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + // no-op + }})); + + PAssert.that(output).containsInAnyOrder(1, 2, 3); + + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testPassThroughThenCleanupExecuted() throws Exception { + Pipeline p = TestPipeline.create(); + + p.apply(Create.of()) + .apply(new PassThroughThenCleanup(new CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + throw new RuntimeException("cleanup executed"); + }})); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("cleanup executed"); + + p.run(); + } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java index 238deede2b86..f8c4c97dd808 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java @@ -43,6 +43,7 @@ import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatus; +import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.cloud.hadoop.util.ApiErrorExtractor; import com.google.common.collect.ImmutableList; @@ -117,6 +118,7 @@ public void testStartLoadJobSucceeds() throws IOException, InterruptedException BackOff backoff = new AttemptBoundedExponentialBackOff( 5 /* attempts */, 1000 /* initialIntervalMillis */); JobServiceImpl.startJob(testJob, new ApiErrorExtractor(), bigquery, sleeper, backoff); + verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); verify(response, times(1)).getContentType(); @@ -253,6 +255,24 @@ public void testPollJobUnknown() throws IOException, InterruptedException { verify(response, times(1)).getContentType(); } + @Test + public void testGetTable() throws IOException, InterruptedException { + Table testTable = new Table(); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testTable)); + + BigQueryServicesImpl.TableServiceImpl tableService = + new BigQueryServicesImpl.TableServiceImpl(bigquery); + Table table = tableService.getTable("projectId", "datasetId", "tableId"); + + assertEquals(testTable, table); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + /** A helper to wrap a {@link GenericJson} object in a content stream. */ private static InputStream toStream(GenericJson content) throws IOException { return new ByteArrayInputStream(JacksonFactory.getDefaultInstance().toByteArray(content)); From fc85df7867160cc3f1c2a764cf4ef35525b91679 Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 9 May 2016 16:17:45 -0700 Subject: [PATCH 2/5] fixup: addresssed feedback --- .../org/apache/beam/sdk/io/BigQueryIO.java | 141 +++++++++++------- .../beam/sdk/util/BigQueryServices.java | 21 ++- .../beam/sdk/util/BigQueryServicesImpl.java | 69 ++++++--- .../apache/beam/sdk/io/BigQueryIOTest.java | 28 ++-- .../sdk/util/BigQueryServicesImplTest.java | 4 +- 5 files changed, 168 insertions(+), 95 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 9d1fec57ff94..5ffa6c8af9ad 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; -import org.apache.beam.sdk.io.BigQueryIO.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.options.BigQueryOptions; @@ -51,8 +50,8 @@ import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff; import org.apache.beam.sdk.util.AvroUtils; import org.apache.beam.sdk.util.BigQueryServices; +import org.apache.beam.sdk.util.BigQueryServices.DatasetService; import org.apache.beam.sdk.util.BigQueryServices.JobService; -import org.apache.beam.sdk.util.BigQueryServices.TableService; import org.apache.beam.sdk.util.BigQueryServicesImpl; import org.apache.beam.sdk.util.BigQueryTableInserter; import org.apache.beam.sdk.util.BigQueryTableRowIterator; @@ -398,7 +397,13 @@ public static class Bound extends PTransform> { + " pipeline, This validation can be disabled using #withoutValidation."; private Bound() { - this(null, null, null, true, null, null); + this( + null /* name */, + null /* query */, + null /* jsonTableRef */, + true /* validate */, + null /* flattenResults */, + null /* testBigQueryServices */); } private Bound( @@ -577,18 +582,18 @@ public PCollection apply(PInput input) { source = BigQueryTableSource.create( jobIdToken, jsonTable, extractDestinationDir, bqServices); } - CleanupOperation cleanupOperation = new CleanupOperation() { - @Override - void cleanup(PipelineOptions options) throws Exception { - IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); - Collection dirMatch = factory.match(extractDestinationDir); - if (!dirMatch.isEmpty()) { - Collection extractFiles = factory.match( - factory.resolve(extractDestinationDir, "*")); - new GcsUtilFactory().create(options).remove(extractFiles); - } - } - }; + PassThroughThenCleanup.CleanupOperation cleanupOperation = + new PassThroughThenCleanup.CleanupOperation() { + @Override + void cleanup(PipelineOptions options) throws Exception { + IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); + Collection dirMatch = factory.match(extractDestinationDir); + if (!dirMatch.isEmpty()) { + Collection extractFiles = factory.match( + factory.resolve(extractDestinationDir, "*")); + new GcsUtilFactory().create(options).remove(extractFiles); + } + }}; return input.getPipeline() .apply(org.apache.beam.sdk.io.Read.from(source)) .setCoder(getDefaultOutputCoder()) @@ -675,6 +680,7 @@ private Read() {} * A {@link PTransform} that invokes {@link CleanupOperation} after the input {@link PCollection} * has been processed. */ + @VisibleForTesting static class PassThroughThenCleanup extends PTransform, PCollection> { private CleanupOperation cleanupOperation; @@ -687,7 +693,7 @@ static class PassThroughThenCleanup extends PTransform, PColle public PCollection apply(PCollection input) { TupleTag mainOutput = new TupleTag<>(); TupleTag cleanupSignal = new TupleTag<>(); - PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn()) + PCollectionTuple outputs = input.apply(ParDo.of(new IdentityFn()) .withOutputTags(mainOutput, TupleTagList.of(cleanupSignal))); PCollectionView cleanupSignalView = outputs.get(cleanupSignal) @@ -708,9 +714,9 @@ public void processElement(ProcessContext c) return outputs.get(mainOutput); } - private class IdentityFn extends DoFn { + private static class IdentityFn extends DoFn { @Override - public void processElement(ProcessContext c) throws Exception { + public void processElement(ProcessContext c) { c.output(c.element()); } } @@ -720,6 +726,9 @@ abstract static class CleanupOperation implements Serializable { } } + /** + * A {@link BigQuerySourceBase} for reading BigQuery tables. + */ @VisibleForTesting static class BigQueryTableSource extends BigQuerySourceBase { @@ -761,7 +770,7 @@ public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws E if (tableSizeBytes.get() == null) { TableReference table = JSON_FACTORY.fromString(jsonTable, TableReference.class); - Long numBytes = bqServices.getTableService(options.as(BigQueryOptions.class)) + Long numBytes = bqServices.getDatasetService(options.as(BigQueryOptions.class)) .getTable(table.getProjectId(), table.getDatasetId(), table.getTableId()) .getNumBytes(); tableSizeBytes.compareAndSet(null, numBytes); @@ -770,6 +779,9 @@ public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws E } } + /** + * A {@link BigQuerySourceBase} for querying BigQuery tables. + */ @VisibleForTesting static class BigQueryQuerySource extends BigQuerySourceBase { @@ -826,7 +838,7 @@ protected TableReference getTableToExtract(BigQueryOptions bqOptions) .getQuery() .getReferencedTables() .get(0); - TableService tableService = bqServices.getTableService(bqOptions); + DatasetService tableService = bqServices.getDatasetService(bqOptions); String location = tableService.getTable( dryRunTempTable.getProjectId(), dryRunTempTable.getDatasetId(), @@ -914,7 +926,20 @@ private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOE } } - abstract static class BigQuerySourceBase extends BoundedSource { + /** + * An abstract {@link BoundedSource} to read a table from BigQuery. + * + *

This source uses a BigQuery export job to take a snapshot of the table on GCS, and then + * reads in parallel from each produced file. It is implemented by {@link BigQueryTableSource}, + * and {@link BigQueryQuerySource}, depending on the configuration of the read. + * Specifically, + *

+ * ... + */ + private abstract static class BigQuerySourceBase extends BoundedSource { // The maximum number of attempts to verify temp files. private static final int MAX_FILES_VERIFY_ATTEMPTS = 10; @@ -949,7 +974,7 @@ public List> splitIntoBundles( String extractJobId = jobIdToken + "-extract"; List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); - TableSchema tableSchema = bqServices.getTableService(bqOptions).getTable( + TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable( tableToExtract.getProjectId(), tableToExtract.getDatasetId(), tableToExtract.getTableId()).getSchema(); @@ -998,7 +1023,7 @@ private List executeExtract( List counts = jobStats.getExtract().getDestinationUriFileCounts(); if (counts.size() != 1) { String errorMessage = (counts.size() == 0 ? - "None destination uri file count received." : + "No destination uri file count received." : String.format("More than one destination uri file count received. First two are %s, %s", counts.get(0), counts.get(1))); throw new RuntimeException(errorMessage); @@ -1045,43 +1070,48 @@ public TableRow apply(GenericRecord input) { } return ImmutableList.copyOf(avroSources); } - } - private static class BigQueryReader extends BoundedSource.BoundedReader { - private final BigQuerySourceBase source; - private final BigQueryServices.BigQueryJsonReader reader; + protected static class BigQueryReader extends BoundedSource.BoundedReader { + private final BigQuerySourceBase source; + private final BigQueryServices.BigQueryJsonReader reader; - private BigQueryReader(BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) { - this.source = source; - this.reader = reader; - } + private BigQueryReader( + BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) { + this.source = source; + this.reader = reader; + } - @Override - public BoundedSource getCurrentSource() { - return source; - } + @Override + public BoundedSource getCurrentSource() { + return source; + } - @Override - public boolean start() throws IOException { - return reader.start(); - } + @Override + public boolean start() throws IOException { + return reader.start(); + } - @Override - public boolean advance() throws IOException { - return reader.advance(); - } + @Override + public boolean advance() throws IOException { + return reader.advance(); + } - @Override - public TableRow getCurrent() throws NoSuchElementException { - return reader.getCurrent(); - } + @Override + public TableRow getCurrent() throws NoSuchElementException { + return reader.getCurrent(); + } - @Override - public void close() throws IOException { - reader.close(); + @Override + public void close() throws IOException { + reader.close(); + } } } + /** + * A {@link BoundedSource} that reads from {@code BoundedSource} + * and transforms elements to type {@code V}. + */ @VisibleForTesting static class TransformingSource extends BoundedSource { private final BoundedSource boundedSource; @@ -1408,8 +1438,15 @@ public TableReference apply(BoundedWindow value) { */ @Deprecated public Bound() { - this(null, null, null, null, CreateDisposition.CREATE_IF_NEEDED, - WriteDisposition.WRITE_EMPTY, true, null); + this( + null /* name */, + null /* jsonTableRef */, + null /* tableRefFunction */, + null /* jsonSchema */, + CreateDisposition.CREATE_IF_NEEDED, + WriteDisposition.WRITE_EMPTY, + true /* validate */, + null /* testBigQueryServices */); } private Bound(String name, @Nullable String jsonTableRef, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java index 3ae093c94f37..b9870d2bb364 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java @@ -45,12 +45,18 @@ public interface BigQueryServices extends Serializable { public JobService getJobService(BigQueryOptions bqOptions); /** - * Returns a real, mock, or fake {@link TableService}. + * Returns a real, mock, or fake {@link DatasetService}. */ - public TableService getTableService(BigQueryOptions bqOptions); + public DatasetService getDatasetService(BigQueryOptions bqOptions); + /** + * Returns a real, mock, or fake {@link BigQueryJsonReader} to read tables. + */ public BigQueryJsonReader getReaderFromTable(BigQueryOptions bqOptions, TableReference tableRef); + /** + * Returns a real, mock, or fake {@link BigQueryJsonReader} to query tables. + */ public BigQueryJsonReader getReaderFromQuery( BigQueryOptions bqOptions, String query, String projectId, @Nullable Boolean flatten); @@ -85,16 +91,19 @@ Job pollJob(String projectId, String jobId, int maxAttempts) } /** - * An interface for the Cloud BigQuery table service. + * An interface to get, create and delete Cloud BigQuery datasets and tables. */ - public interface TableService { + public interface DatasetService { /** - * Gets the specified table resource by table ID. + * Gets the specified {@link Table} resource by table ID. */ Table getTable(String projectId, String datasetId, String tableId) throws InterruptedException, IOException; - Dataset createDataset(String projectId, String datasetId, String location, String description) + /** + * Create a {@link Dataset} with the given {@code location} and {@code description}. + */ + void createDataset(String projectId, String datasetId, String location, String description) throws IOException, InterruptedException; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java index a62ae10cde4c..ac7ff826b563 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java @@ -71,8 +71,8 @@ public JobService getJobService(BigQueryOptions options) { } @Override - public TableService getTableService(BigQueryOptions options) { - return new TableServiceImpl(options); + public DatasetService getDatasetService(BigQueryOptions options) { + return new DatasetServiceImpl(options); } @Override @@ -244,18 +244,28 @@ Job pollJob( } @VisibleForTesting - static class TableServiceImpl implements TableService { + static class DatasetServiceImpl implements DatasetService { + private final ApiErrorExtractor errorExtractor; private final Bigquery client; @VisibleForTesting - TableServiceImpl(Bigquery client) { + DatasetServiceImpl(Bigquery client) { + this.errorExtractor = new ApiErrorExtractor(); this.client = client; } - private TableServiceImpl(BigQueryOptions bqOptions) { + private DatasetServiceImpl(BigQueryOptions bqOptions) { + this.errorExtractor = new ApiErrorExtractor(); this.client = Transport.newBigQueryClient(bqOptions).build(); } + /** + * {@inheritDoc} + * + *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC retries. + */ @Override public Table getTable(String projectId, String datasetId, String tableId) throws IOException, InterruptedException { @@ -288,37 +298,52 @@ Table getTable( lastException); } + /** + * {@inheritDoc} + * + *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC retries. + */ @Override - public Dataset createDataset( + public void createDataset( String projectId, String datasetId, String location, String description) throws IOException, InterruptedException { BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); - return createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff); + createDataset(projectId, datasetId, location, description, Sleeper.DEFAULT, backoff); } @VisibleForTesting - Dataset createDataset( + void createDataset( String projectId, String datasetId, String location, String description, Sleeper sleeper, BackOff backoff) throws IOException, InterruptedException { + DatasetReference datasetRef = new DatasetReference(); + datasetRef.setProjectId(projectId); + datasetRef.setDatasetId(datasetId); + + Dataset dataset = new Dataset(); + dataset.setDatasetReference(datasetRef); + dataset.setLocation(location); + dataset.setFriendlyName(location); + dataset.setDescription(description); + Exception lastException; do { try { - DatasetReference datasetRef = new DatasetReference(); - datasetRef.setProjectId(projectId); - datasetRef.setDatasetId(datasetId); - - Dataset dataset = new Dataset(); - dataset.setDatasetReference(datasetRef); - dataset.setLocation(location); - dataset.setFriendlyName(location); - dataset.setDescription(description); - - return client.datasets().insert(projectId, dataset).execute(); + client.datasets().insert(projectId, dataset).execute(); + return; // SUCCEEDED + } catch (GoogleJsonResponseException e) { + if (errorExtractor.itemAlreadyExists(e)) { + return; // SUCCEEDED + } + // ignore and retry + LOG.warn("Ignore the error and retry creating the dataset.", e); + lastException = e; } catch (IOException e) { LOG.warn("Ignore the error and retry creating the dataset.", e); lastException = e; @@ -360,10 +385,10 @@ private static BigQueryJsonReader fromTable( public boolean start() throws IOException { try { iterator.open(); - return true; + return iterator.advance(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException(e); + throw new RuntimeException("Interrupted during start() operation", e); } } @@ -373,7 +398,7 @@ public boolean advance() throws IOException { return iterator.advance(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException(e); + throw new RuntimeException("Interrupted during advance() operation", e); } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index c6a3aaae9881..e1e99e0fea72 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -55,8 +55,8 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.BigQueryServices; +import org.apache.beam.sdk.util.BigQueryServices.DatasetService; import org.apache.beam.sdk.util.BigQueryServices.JobService; -import org.apache.beam.sdk.util.BigQueryServices.TableService; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.IOChannelFactory; import org.apache.beam.sdk.util.IOChannelUtils; @@ -121,15 +121,15 @@ private static class FakeBigQueryServices implements BigQueryServices { private String[] jsonTableRowReturns = new String[0]; private JobService jobService; - private TableService tableService; + private DatasetService datasetService; public FakeBigQueryServices withJobService(JobService jobService) { this.jobService = jobService; return this; } - public FakeBigQueryServices withTableService(TableService tableService) { - this.tableService = tableService; + public FakeBigQueryServices withDatasetService(DatasetService datasetService) { + this.datasetService = datasetService; return this; } @@ -144,8 +144,8 @@ public JobService getJobService(BigQueryOptions bqOptions) { } @Override - public TableService getTableService(BigQueryOptions bqOptions) { - return tableService; + public DatasetService getDatasetService(BigQueryOptions bqOptions) { + return datasetService; } @Override @@ -296,7 +296,7 @@ private void startJob() throws IOException, InterruptedException { @Rule public transient TemporaryFolder testFolder = new TemporaryFolder(); @Mock public transient BigQueryServices.JobService mockJobService; @Mock private transient IOChannelFactory mockIOChannelFactory; - @Mock private transient TableService mockTableService; + @Mock private transient DatasetService mockDatasetService; private transient BigQueryOptions bqOptions; @@ -467,7 +467,7 @@ public void testBuildSourceWithTableAndFlatten() { } @Test - public void testCustomSource() { + public void testReadFromTable() { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService( new FakeJobService().startJobReturns("done", "done")) @@ -852,7 +852,7 @@ public void testWriteValidateFailsNoTableAndNoTableSpec() { } @Test - public void testBigQuerySource() throws Exception { + public void testBigQueryTableSourceThroughJsonAPI() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(mockJobService) .readerReturns( @@ -891,7 +891,7 @@ public void testBigQueryTableSourceInitSplit() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(mockJobService) - .withTableService(mockTableService) + .withDatasetService(mockDatasetService) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", "1")), toJsonString(new TableRow().set("name", "b").set("number", "2")), @@ -916,7 +916,7 @@ public void testBigQueryTableSourceInitSplit() throws Exception { IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); when(mockIOChannelFactory.resolve(anyString(), anyString())) .thenReturn("mock://tempLocation/output"); - when(mockTableService.getTable(anyString(), anyString(), anyString())) + when(mockDatasetService.getTable(anyString(), anyString(), anyString())) .thenReturn(new Table().setSchema(new TableSchema())); Assert.assertThat( @@ -956,7 +956,7 @@ public void testBigQueryQuerySourceInitSplit() throws Exception { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() .withJobService(mockJobService) - .withTableService(mockTableService) + .withDatasetService(mockDatasetService) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", "1")), toJsonString(new TableRow().set("name", "b").set("number", "2")), @@ -982,7 +982,7 @@ public void testBigQueryQuerySourceInitSplit() throws Exception { IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); when(mockIOChannelFactory.resolve(anyString(), anyString())) .thenReturn("mock://tempLocation/output"); - when(mockTableService.getTable(anyString(), anyString(), anyString())) + when(mockDatasetService.getTable(anyString(), anyString(), anyString())) .thenReturn(new Table().setSchema(new TableSchema())); Assert.assertThat( @@ -1002,6 +1002,8 @@ public void testBigQueryQuerySourceInitSplit() throws Exception { .startQueryJob(anyString(), Mockito.any(), Mockito.eq(false)); Mockito.verify(mockJobService) .startExtractJob(anyString(), Mockito.any()); + Mockito.verify(mockDatasetService) + .createDataset(anyString(), anyString(), anyString(), anyString()); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java index f8c4c97dd808..aff8bca160b9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java @@ -263,8 +263,8 @@ public void testGetTable() throws IOException, InterruptedException { when(response.getStatusCode()).thenReturn(200); when(response.getContent()).thenReturn(toStream(testTable)); - BigQueryServicesImpl.TableServiceImpl tableService = - new BigQueryServicesImpl.TableServiceImpl(bigquery); + BigQueryServicesImpl.DatasetServiceImpl tableService = + new BigQueryServicesImpl.DatasetServiceImpl(bigquery); Table table = tableService.getTable("projectId", "datasetId", "tableId"); assertEquals(testTable, table); From 6c104b5e1b141955fa4af8b26929730298072361 Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 9 May 2016 18:49:34 -0700 Subject: [PATCH 3/5] fixup: remove temp table and dataset --- .../org/apache/beam/sdk/io/BigQueryIO.java | 24 +++- .../beam/sdk/util/BigQueryServices.java | 15 +++ .../beam/sdk/util/BigQueryServicesImpl.java | 112 ++++++++++++------ .../sdk/util/BigQueryServicesImplTest.java | 10 +- 4 files changed, 122 insertions(+), 39 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 5ffa6c8af9ad..96febb6fd274 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -777,6 +777,11 @@ public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws E } return tableSizeBytes.get(); } + + @Override + protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { + // Do nothing. + } } /** @@ -857,6 +862,19 @@ protected TableReference getTableToExtract(BigQueryOptions bqOptions) return tableToExtract; } + @Override + protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { + TableReference tableToRemove = + JSON_FACTORY.fromString(jsonQueryTempTable, TableReference.class); + + DatasetService tableService = bqServices.getDatasetService(bqOptions); + tableService.deleteTable( + tableToRemove.getProjectId(), + tableToRemove.getDatasetId(), + tableToRemove.getTableId()); + tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); + } + private synchronized Job dryRunQueryIfNeeded(BigQueryOptions bqOptions) { if (dryRunJobRef.get() == null) { String projectId = bqOptions.getProject(); @@ -979,11 +997,13 @@ public List> splitIntoBundles( tableToExtract.getDatasetId(), tableToExtract.getTableId()).getSchema(); + cleanupTempResource(bqOptions); return createSources(tempFiles, tableSchema); } - protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) - throws Exception; + protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception; + + protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception; @Override public boolean producesSortedKeys(PipelineOptions options) throws Exception { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java index b9870d2bb364..e0d244357c5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java @@ -100,11 +100,26 @@ public interface DatasetService { Table getTable(String projectId, String datasetId, String tableId) throws InterruptedException, IOException; + /** + * Deletes the table specified by tableId from the dataset. + * If the table contains data, all the data will be deleted. + */ + void deleteTable(String projectId, String datasetId, String tableId) + throws IOException, InterruptedException; + /** * Create a {@link Dataset} with the given {@code location} and {@code description}. */ void createDataset(String projectId, String datasetId, String location, String description) throws IOException, InterruptedException; + + /** + * Deletes the dataset specified by the datasetId value. + * + *

Before you can delete a dataset, you must delete all its tables. + */ + void deleteDataset(String projectId, String datasetId) + throws IOException, InterruptedException; } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java index ac7ff826b563..a3bc963c099d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java @@ -20,6 +20,7 @@ import org.apache.beam.sdk.options.BigQueryOptions; import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.Sleeper; @@ -105,9 +106,9 @@ private JobServiceImpl(BigQueryOptions options) { /** * {@inheritDoc} * - *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC retries. + * @throws IOException if it exceeds max RPC . */ @Override public void startLoadJob( @@ -128,9 +129,9 @@ public void startLoadJob( /** * {@inheritDoc} * - *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC retries. + * @throws IOException if it exceeds max RPC . */ @Override public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) @@ -150,9 +151,9 @@ public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) /** * {@inheritDoc} * - *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC retries. + * @throws IOException if it exceeds max RPC . */ @Override public void startQueryJob(String jobId, JobConfigurationQuery queryConfig, boolean dryRun) @@ -206,7 +207,7 @@ static void startJob( } while (nextBackOff(sleeper, backoff)); throw new IOException( String.format( - "Unable to insert job: %s, aborting after %d retries.", + "Unable to insert job: %s, aborting after %d .", jobRef.getJobId(), MAX_RPC_ATTEMPTS), lastException); } @@ -238,7 +239,7 @@ Job pollJob( LOG.warn("Ignore the error and retry polling job status.", e); } } while (nextBackOff(sleeper, backoff)); - LOG.warn("Unable to poll job status: {}, aborting after reached max retries.", jobId); + LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobId); return null; } } @@ -262,48 +263,51 @@ private DatasetServiceImpl(BigQueryOptions bqOptions) { /** * {@inheritDoc} * - *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC retries. + * @throws IOException if it exceeds max RPC . */ @Override public Table getTable(String projectId, String datasetId, String tableId) throws IOException, InterruptedException { BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); - return getTable(projectId, datasetId, tableId, Sleeper.DEFAULT, backoff); + return executeWithRetries( + client.tables().get(projectId, datasetId, tableId), + String.format( + "Unable to get table: %s, aborting after %d retries.", + tableId, MAX_RPC_ATTEMPTS), + Sleeper.DEFAULT, + backoff); } - @VisibleForTesting - Table getTable( - String projectId, - String datasetId, - String tableId, - Sleeper sleeper, - BackOff backoff) + /** + * {@inheritDoc} + * + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC . + */ + @Override + public void deleteTable(String projectId, String datasetId, String tableId) throws IOException, InterruptedException { - Exception lastException = null; - do { - try { - return client.tables().get(projectId, datasetId, tableId).execute(); - } catch (IOException e) { - LOG.warn("Ignore the error and retry getting the table.", e); - lastException = e; - } - } while (nextBackOff(sleeper, backoff)); - throw new IOException( + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + executeWithRetries( + client.tables().delete(projectId, datasetId, tableId), String.format( - "Unable to get table: %s, aborting after %d retries.", + "Unable to delete table: %s, aborting after %d retries.", tableId, MAX_RPC_ATTEMPTS), - lastException); + Sleeper.DEFAULT, + backoff); } /** * {@inheritDoc} * - *

Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. * - * @throws IOException if it exceeds max RPC retries. + * @throws IOException if it exceeds max RPC . */ @Override public void createDataset( @@ -351,10 +355,31 @@ void createDataset( } while (nextBackOff(sleeper, backoff)); throw new IOException( String.format( - "Unable to create dataset: %s, aborting after %d retries.", + "Unable to create dataset: %s, aborting after %d .", datasetId, MAX_RPC_ATTEMPTS), lastException); } + + /** + * {@inheritDoc} + * + *

the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. + * + * @throws IOException if it exceeds max RPC . + */ + @Override + public void deleteDataset(String projectId, String datasetId) + throws IOException, InterruptedException { + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + executeWithRetries( + client.datasets().delete(projectId, datasetId), + String.format( + "Unable to delete table: %s, aborting after %d retries.", + datasetId, MAX_RPC_ATTEMPTS), + Sleeper.DEFAULT, + backoff); + } } private static class BigQueryJsonReaderImpl implements BigQueryJsonReader { @@ -413,6 +438,27 @@ public void close() throws IOException { } } + @VisibleForTesting + static T executeWithRetries( + AbstractGoogleClientRequest request, + String errorMessage, + Sleeper sleeper, + BackOff backoff) + throws IOException, InterruptedException { + Exception lastException = null; + do { + try { + return request.execute(); + } catch (IOException e) { + LOG.warn("Ignore the error and retry the request.", e); + lastException = e; + } + } while (nextBackOff(sleeper, backoff)); + throw new IOException( + errorMessage, + lastException); + } + /** * Identical to {@link BackOffUtils#next} but without checked IOException. * @throws InterruptedException diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java index aff8bca160b9..3df08c310acd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java @@ -256,16 +256,18 @@ public void testPollJobUnknown() throws IOException, InterruptedException { } @Test - public void testGetTable() throws IOException, InterruptedException { + public void testExecuteWithRetries() throws IOException, InterruptedException { Table testTable = new Table(); when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); when(response.getStatusCode()).thenReturn(200); when(response.getContent()).thenReturn(toStream(testTable)); - BigQueryServicesImpl.DatasetServiceImpl tableService = - new BigQueryServicesImpl.DatasetServiceImpl(bigquery); - Table table = tableService.getTable("projectId", "datasetId", "tableId"); + Table table = BigQueryServicesImpl.executeWithRetries( + bigquery.tables().get("projectId", "datasetId", "tableId"), + "Failed to get table.", + Sleeper.DEFAULT, + BackOff.STOP_BACKOFF); assertEquals(testTable, table); verify(response, times(1)).getStatusCode(); From a75effc4126621a85979b4af443341b5b904debb Mon Sep 17 00:00:00 2001 From: Pei He Date: Wed, 11 May 2016 17:11:22 -0700 Subject: [PATCH 4/5] fixup: dryRun fixes --- .../org/apache/beam/sdk/io/BigQueryIO.java | 90 ++++++++----------- .../beam/sdk/util/BigQueryServices.java | 16 +++- .../beam/sdk/util/BigQueryServicesImpl.java | 73 ++++++++------- .../apache/beam/sdk/io/BigQueryIOTest.java | 56 ++++++++---- 4 files changed, 128 insertions(+), 107 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 96febb6fd274..6dca8d030252 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -82,6 +82,7 @@ import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.QueryRequest; @@ -536,7 +537,7 @@ private static void dryRunQuery(BigQueryOptions options, String query) { @Override public PCollection apply(PInput input) { - String uuid = UUID.randomUUID().toString(); + String uuid = randomUUIDString(); String jobIdToken = "beam_job_" + uuid; String queryTempDatasetId = "temp_dataset_" + uuid; String queryTempTableId = "temp_table_" + uuid; @@ -804,7 +805,7 @@ static BigQueryQuerySource create( private final String query; private final String jsonQueryTempTable; private final Boolean flattenResults; - private transient AtomicReference dryRunJobRef; + private transient AtomicReference dryRunJobStats; private BigQueryQuerySource( String jobIdToken, @@ -817,14 +818,13 @@ private BigQueryQuerySource( this.query = checkNotNull(query, "query"); this.jsonQueryTempTable = checkNotNull(jsonQueryTempTable, "jsonQueryTempTable"); this.flattenResults = checkNotNull(flattenResults, "flattenResults"); - this.dryRunJobRef = new AtomicReference<>(); + this.dryRunJobStats = new AtomicReference<>(); } @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { BigQueryOptions bqOptions = options.as(BigQueryOptions.class); - return dryRunQueryIfNeeded(bqOptions) - .getStatistics().getTotalBytesProcessed(); + return dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed(); } @Override @@ -839,7 +839,6 @@ protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException, InterruptedException { // 1. Find the location of the query. TableReference dryRunTempTable = dryRunQueryIfNeeded(bqOptions) - .getStatistics() .getQuery() .getReferencedTables() .get(0); @@ -875,43 +874,15 @@ protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception { tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId()); } - private synchronized Job dryRunQueryIfNeeded(BigQueryOptions bqOptions) { - if (dryRunJobRef.get() == null) { + private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) + throws InterruptedException, IOException { + if (dryRunJobStats.get() == null) { String projectId = bqOptions.getProject(); - try { - String dryRunJobId = jobIdToken + "-dryRunQuery"; - Job dryRunJob = dryRunQuery( - projectId, dryRunJobId, query, flattenResults, bqServices.getJobService(bqOptions)); - dryRunJobRef.compareAndSet(null, dryRunJob); - } catch (IOException | InterruptedException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new RuntimeException("Failed to dry run query.", e); - } + JobStatistics jobStats = + bqServices.getJobService(bqOptions).dryRunQuery(projectId, query); + dryRunJobStats.compareAndSet(null, jobStats); } - return dryRunJobRef.get(); - } - - private static Job dryRunQuery( - String projectId, - String jobId, - String query, - boolean flattenResults, - JobService jobService) throws InterruptedException, IOException { - JobConfigurationQuery queryConfig = new JobConfigurationQuery(); - queryConfig - .setQuery(query) - .setCreateDisposition("CREATE_IF_NEEDED") - .setFlattenResults(flattenResults) - .setPriority("BATCH") - .setWriteDisposition("WRITE_EMPTY"); - jobService.startQueryJob(jobId, queryConfig, true /* dryRun */); - Job job = jobService.pollJob(projectId, jobId, QUERY_DRY_RUN_POLL_MAX_RETRIES); - if (parseStatus(job) != Status.SUCCEEDED) { - throw new IOException("Query job failed: " + jobId); - } - return job; + return dryRunJobStats.get(); } private static void executeQuery( @@ -920,6 +891,9 @@ private static void executeQuery( TableReference destinationTable, boolean flattenResults, JobService jobService) throws IOException, InterruptedException { + JobReference jobRef = new JobReference() + .setProjectId(destinationTable.getProjectId()) + .setJobId(jobId); JobConfigurationQuery queryConfig = new JobConfigurationQuery(); queryConfig .setQuery(query) @@ -929,9 +903,8 @@ private static void executeQuery( .setFlattenResults(flattenResults) .setPriority("BATCH") .setWriteDisposition("WRITE_EMPTY"); - jobService.startQueryJob(jobId, queryConfig, false /* dryRun */); - Job job = jobService.pollJob( - destinationTable.getProjectId(), jobId, JOB_POLL_MAX_RETRIES); + jobService.startQueryJob(jobRef, queryConfig); + Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); if (parseStatus(job) != Status.SUCCEEDED) { throw new IOException("Query job failed: " + jobId); } @@ -940,7 +913,7 @@ private static void executeQuery( private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException { in.defaultReadObject(); - dryRunJobRef = new AtomicReference<>(); + dryRunJobStats = new AtomicReference<>(); } } @@ -964,9 +937,6 @@ private abstract static class BigQuerySourceBase extends BoundedSource // The maximum number of retries to poll a BigQuery job. protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE; - // The maximum number of retries to poll a dry run query job. - protected static final int QUERY_DRY_RUN_POLL_MAX_RETRIES = 10; - // The initial backoff for verifying temp files. private static final long INITIAL_FILES_VERIFY_BACKOFF_MILLIS = TimeUnit.SECONDS.toMillis(1); @@ -1023,6 +993,10 @@ public Coder getDefaultOutputCoder() { private List executeExtract( String jobId, TableReference table, JobService jobService) throws InterruptedException, IOException { + JobReference jobRef = new JobReference() + .setProjectId(table.getProjectId()) + .setJobId(jobId); + String destinationUri = String.format("%s/%s", extractDestinationDir, "*.avro"); JobConfigurationExtract extract = new JobConfigurationExtract(); extract.setSourceTable(table); @@ -1030,9 +1004,9 @@ private List executeExtract( extract.setDestinationUris(ImmutableList.of(destinationUri)); LOG.info("Starting BigQuery extract job: {}", jobId); - jobService.startExtractJob(jobId, extract); + jobService.startExtractJob(jobRef, extract); Job job = - jobService.pollJob(table.getProjectId(), jobId, JOB_POLL_MAX_RETRIES); + jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); if (parseStatus(job) != Status.SUCCEEDED) { throw new IOException(String.format( "Extract job %s failed, status: %s", @@ -1698,7 +1672,7 @@ public PDone apply(PCollection input) { if (Strings.isNullOrEmpty(table.getProjectId())) { table.setProjectId(options.getProject()); } - String jobIdToken = UUID.randomUUID().toString(); + String jobIdToken = randomUUIDString(); String tempLocation = options.getTempLocation(); String tempFilePrefix; try { @@ -1910,9 +1884,12 @@ private void load( LOG.info("Previous load jobs failed, retrying."); } LOG.info("Starting BigQuery load job: {}", jobId); - jobService.startLoadJob(jobId, loadConfig); + JobReference jobRef = new JobReference() + .setProjectId(projectId) + .setJobId(jobId); + jobService.startLoadJob(jobRef, loadConfig); Status jobStatus = - parseStatus(jobService.pollJob(projectId, jobId, LOAD_JOB_POLL_MAX_RETRIES)); + parseStatus(jobService.pollJob(jobRef, LOAD_JOB_POLL_MAX_RETRIES)); switch (jobStatus) { case SUCCEEDED: return; @@ -2393,6 +2370,13 @@ static T fromJsonString(String json, Class clazz) { } } + /** + * Returns a randomUUID string without {@code '-'}. + */ + private static String randomUUIDString() { + return UUID.randomUUID().toString().replaceAll("-", ""); + } + ///////////////////////////////////////////////////////////////////////////// /** Disallow construction of utility class. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java index e0d244357c5e..f82edf447ea3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServices.java @@ -24,6 +24,8 @@ import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; @@ -67,18 +69,18 @@ public interface JobService { /** * Start a BigQuery load job. */ - void startLoadJob(String jobId, JobConfigurationLoad loadConfig) + void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException; /** * Start a BigQuery extract job. */ - void startExtractJob(String jobId, JobConfigurationExtract extractConfig) + void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException; /** * Start a BigQuery query job. */ - void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) + void startQueryJob(JobReference jobRef, JobConfigurationQuery query) throws IOException, InterruptedException; /** @@ -86,7 +88,13 @@ void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) * *

Returns null if the {@code maxAttempts} retries reached. */ - Job pollJob(String projectId, String jobId, int maxAttempts) + Job pollJob(JobReference jobRef, int maxAttempts) + throws InterruptedException, IOException; + + /** + * Dry runs the query in the given project. + */ + JobStatistics dryRunQuery(String projectId, String query) throws InterruptedException, IOException; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java index a3bc963c099d..2afc731d873c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java @@ -33,6 +33,7 @@ import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; @@ -112,16 +113,11 @@ private JobServiceImpl(BigQueryOptions options) { */ @Override public void startLoadJob( - String jobId, + JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException { - Job job = new Job(); - JobReference jobRef = new JobReference(); - jobRef.setProjectId(loadConfig.getDestinationTable().getProjectId()); - jobRef.setJobId(jobId); - job.setJobReference(jobRef); - JobConfiguration jobConfig = new JobConfiguration(); - jobConfig.setLoad(loadConfig); - job.setConfiguration(jobConfig); + Job job = new Job() + .setJobReference(jobRef) + .setConfiguration(new JobConfiguration().setLoad(loadConfig)); startJob(job, errorExtractor, client); } @@ -134,16 +130,12 @@ public void startLoadJob( * @throws IOException if it exceeds max RPC . */ @Override - public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) + public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException { - Job job = new Job(); - JobReference jobRef = new JobReference(); - jobRef.setProjectId(extractConfig.getSourceTable().getProjectId()); - jobRef.setJobId(jobId); - job.setJobReference(jobRef); - JobConfiguration jobConfig = new JobConfiguration(); - jobConfig.setExtract(extractConfig); - job.setConfiguration(jobConfig); + Job job = new Job() + .setJobReference(jobRef) + .setConfiguration( + new JobConfiguration().setExtract(extractConfig)); startJob(job, errorExtractor, client); } @@ -156,17 +148,12 @@ public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) * @throws IOException if it exceeds max RPC . */ @Override - public void startQueryJob(String jobId, JobConfigurationQuery queryConfig, boolean dryRun) + public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig) throws IOException, InterruptedException { - Job job = new Job(); - JobReference jobRef = new JobReference(); - jobRef.setProjectId(queryConfig.getDestinationTable().getProjectId()); - jobRef.setJobId(jobId); - job.setJobReference(jobRef); - JobConfiguration jobConfig = new JobConfiguration(); - jobConfig.setQuery(queryConfig); - jobConfig.setDryRun(dryRun); - job.setConfiguration(jobConfig); + Job job = new Job() + .setJobReference(jobRef) + .setConfiguration( + new JobConfiguration().setQuery(queryConfig)); startJob(job, errorExtractor, client); } @@ -213,22 +200,21 @@ static void startJob( } @Override - public Job pollJob(String projectId, String jobId, int maxAttempts) + public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { BackOff backoff = new AttemptBoundedExponentialBackOff( maxAttempts, INITIAL_JOB_STATUS_POLL_BACKOFF_MILLIS); - return pollJob(projectId, jobId, Sleeper.DEFAULT, backoff); + return pollJob(jobRef, Sleeper.DEFAULT, backoff); } @VisibleForTesting Job pollJob( - String projectId, - String jobId, + JobReference jobRef, Sleeper sleeper, BackOff backoff) throws InterruptedException { do { try { - Job job = client.jobs().get(projectId, jobId).execute(); + Job job = client.jobs().get(jobRef.getProjectId(), jobRef.getJobId()).execute(); JobStatus status = job.getStatus(); if (status != null && status.getState() != null && status.getState().equals("DONE")) { return job; @@ -239,9 +225,28 @@ Job pollJob( LOG.warn("Ignore the error and retry polling job status.", e); } } while (nextBackOff(sleeper, backoff)); - LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobId); + LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId()); return null; } + + @Override + public JobStatistics dryRunQuery(String projectId, String query) + throws InterruptedException, IOException { + Job job = new Job() + .setConfiguration(new JobConfiguration() + .setQuery(new JobConfigurationQuery() + .setQuery(query)) + .setDryRun(true)); + BackOff backoff = + new AttemptBoundedExponentialBackOff(MAX_RPC_ATTEMPTS, INITIAL_RPC_BACKOFF_MILLIS); + return executeWithRetries( + client.jobs().insert(projectId, job), + String.format( + "Unable to dry run query: %s, aborting after %d retries.", + query, MAX_RPC_ATTEMPTS), + Sleeper.DEFAULT, + backoff).getStatistics(); + } } @VisibleForTesting diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index e1e99e0fea72..acbb80c94c08 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.when; import org.apache.beam.sdk.Pipeline; @@ -68,6 +69,7 @@ import com.google.api.services.bigquery.model.JobConfigurationExtract; import com.google.api.services.bigquery.model.JobConfigurationLoad; import com.google.api.services.bigquery.model.JobConfigurationQuery; +import com.google.api.services.bigquery.model.JobReference; import com.google.api.services.bigquery.model.JobStatistics; import com.google.api.services.bigquery.model.JobStatistics2; import com.google.api.services.bigquery.model.JobStatistics4; @@ -105,6 +107,7 @@ import java.util.NoSuchElementException; import javax.annotation.Nullable; +import javax.print.attribute.standard.Destination; /** * Tests for BigQueryIO. @@ -237,25 +240,25 @@ public FakeJobService pollJobReturns(Object... pollJobReturns) { } @Override - public void startLoadJob(String jobId, JobConfigurationLoad loadConfig) + public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) throws InterruptedException, IOException { startJob(); } @Override - public void startExtractJob(String jobId, JobConfigurationExtract extractConfig) + public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) throws InterruptedException, IOException { startJob(); } @Override - public void startQueryJob(String jobId, JobConfigurationQuery query, boolean dryRun) + public void startQueryJob(JobReference jobRef, JobConfigurationQuery query) throws IOException, InterruptedException { startJob(); } @Override - public Job pollJob(String projectId, String jobId, int maxAttempts) + public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { if (pollJobStatusCallsCount < pollJobReturns.length) { Object ret = pollJobReturns[pollJobStatusCallsCount++]; @@ -289,6 +292,12 @@ private void startJob() throws IOException, InterruptedException { "Exceeded expected number of calls: " + startJobReturns.length); } } + + @Override + public JobStatistics dryRunQuery(String projectId, String query) + throws InterruptedException, IOException { + throw new UnsupportedOperationException(); + } } @Rule public transient ExpectedException thrown = ExpectedException.none(); @@ -908,7 +917,7 @@ public void testBigQueryTableSourceInitSplit() throws Exception { new TableRow().set("name", "b").set("number", "2"), new TableRow().set("name", "c").set("number", "3")); - when(mockJobService.pollJob(anyString(), anyString(), Mockito.anyInt())) + when(mockJobService.pollJob(Mockito.any(), Mockito.anyInt())) .thenReturn(extractJob); PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation("mock://tempLocation"); @@ -931,7 +940,7 @@ public void testBigQueryTableSourceInitSplit() throws Exception { assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); Mockito.verify(mockJobService) - .startExtractJob(anyString(), Mockito.any()); + .startExtractJob(Mockito.any(), Mockito.any()); } @Test @@ -964,9 +973,10 @@ public void testBigQueryQuerySourceInitSplit() throws Exception { String jobIdToken = "testJobIdToken"; String extractDestinationDir = "mock://tempLocation"; - String jsonTable = toJsonString(BigQueryIO.parseTableSpec("project.data_set.table_name")); + TableReference destinationTable = BigQueryIO.parseTableSpec("project.data_set.table_name"); + String jsonDestinationTable = toJsonString(destinationTable); BoundedSource bqSource = BigQueryQuerySource.create( - jobIdToken, "query", jsonTable, true /* flattenResults */, + jobIdToken, "query", jsonDestinationTable, true /* flattenResults */, extractDestinationDir, fakeBqServices); List expected = ImmutableList.of( @@ -977,13 +987,28 @@ public void testBigQueryQuerySourceInitSplit() throws Exception { PipelineOptions options = PipelineOptionsFactory.create(); options.setTempLocation(extractDestinationDir); - when(mockJobService.pollJob(anyString(), anyString(), Mockito.anyInt())) - .thenReturn(queryJob, extractJob); + TableReference queryTable = new TableReference() + .setProjectId("testProejct") + .setDatasetId("testDataset") + .setTableId("testTable"); + when(mockJobService.dryRunQuery(anyString(), anyString())) + .thenReturn(new JobStatistics().setQuery( + new JobStatistics2() + .setTotalBytesProcessed(100L) + .setReferencedTables(ImmutableList.of(queryTable)))); + when(mockDatasetService.getTable( + eq(queryTable.getProjectId()), eq(queryTable.getDatasetId()), eq(queryTable.getTableId()))) + .thenReturn(new Table().setSchema(new TableSchema())); + when(mockDatasetService.getTable( + eq(destinationTable.getProjectId()), + eq(destinationTable.getDatasetId()), + eq(destinationTable.getTableId()))) + .thenReturn(new Table().setSchema(new TableSchema())); IOChannelUtils.setIOFactory("mock", mockIOChannelFactory); when(mockIOChannelFactory.resolve(anyString(), anyString())) .thenReturn("mock://tempLocation/output"); - when(mockDatasetService.getTable(anyString(), anyString(), anyString())) - .thenReturn(new Table().setSchema(new TableSchema())); + when(mockJobService.pollJob(Mockito.any(), Mockito.anyInt())) + .thenReturn(extractJob); Assert.assertThat( SourceTestUtils.readFromSource(bqSource, options), @@ -997,11 +1022,10 @@ public void testBigQueryQuerySourceInitSplit() throws Exception { assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class)); Mockito.verify(mockJobService) - .startQueryJob(anyString(), Mockito.any(), Mockito.eq(true)); - Mockito.verify(mockJobService) - .startQueryJob(anyString(), Mockito.any(), Mockito.eq(false)); + .startQueryJob( + Mockito.any(), Mockito.any()); Mockito.verify(mockJobService) - .startExtractJob(anyString(), Mockito.any()); + .startExtractJob(Mockito.any(), Mockito.any()); Mockito.verify(mockDatasetService) .createDataset(anyString(), anyString(), anyString(), anyString()); } From 6b49956ec0c0dbb0bf692dbcb2b4bd3f87bd674a Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 12 May 2016 15:29:56 -0700 Subject: [PATCH 5/5] fixup: expand files pattern for extract files --- .../org/apache/beam/sdk/io/BigQueryIO.java | 127 +++++++++++------- .../beam/sdk/util/BigQueryServicesImpl.java | 18 +-- .../apache/beam/sdk/io/BigQueryIOTest.java | 6 +- .../sdk/util/BigQueryServicesImplTest.java | 18 ++- 4 files changed, 105 insertions(+), 64 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 6dca8d030252..254bb864a05a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -397,6 +397,10 @@ public static class Bound extends PTransform> { "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the" + " pipeline, This validation can be disabled using #withoutValidation."; + // The maximum number of retries to poll a BigQuery job in the cleanup phase. + // We expect the jobs have already DONE, and don't need a high max retires. + private static final int CLEANUP_JOB_POLL_MAX_RETRIES = 10; + private Bound() { this( null /* name */, @@ -538,14 +542,12 @@ private static void dryRunQuery(BigQueryOptions options, String query) { @Override public PCollection apply(PInput input) { String uuid = randomUUIDString(); - String jobIdToken = "beam_job_" + uuid; - String queryTempDatasetId = "temp_dataset_" + uuid; - String queryTempTableId = "temp_table_" + uuid; + final String jobIdToken = "beam_job_" + uuid; BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class); BoundedSource source; - BigQueryServices bqServices = getBigQueryServices(); + final BigQueryServices bqServices = getBigQueryServices(); final String extractDestinationDir; String tempLocation = bqOptions.getTempLocation(); @@ -559,6 +561,9 @@ public PCollection apply(PInput input) { if (!Strings.isNullOrEmpty(query)) { String projectId = bqOptions.getProject(); + String queryTempDatasetId = "temp_dataset_" + uuid; + String queryTempTableId = "temp_table_" + uuid; + TableReference queryTempTableRef = new TableReference() .setProjectId(projectId) .setDatasetId(queryTempDatasetId) @@ -587,13 +592,27 @@ public PCollection apply(PInput input) { new PassThroughThenCleanup.CleanupOperation() { @Override void cleanup(PipelineOptions options) throws Exception { - IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); - Collection dirMatch = factory.match(extractDestinationDir); - if (!dirMatch.isEmpty()) { - Collection extractFiles = factory.match( - factory.resolve(extractDestinationDir, "*")); - new GcsUtilFactory().create(options).remove(extractFiles); - } + BigQueryOptions bqOptions = options.as(BigQueryOptions.class); + + JobReference jobRef = new JobReference() + .setProjectId(bqOptions.getProject()) + .setJobId(getExtractJobId(jobIdToken)); + Job extractJob = bqServices.getJobService(bqOptions).pollJob( + jobRef, CLEANUP_JOB_POLL_MAX_RETRIES); + + Collection extractFiles = null; + if (extractJob != null) { + extractFiles = getExtractFilePaths(extractDestinationDir, extractJob); + } else { + IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); + Collection dirMatch = factory.match(extractDestinationDir); + if (!dirMatch.isEmpty()) { + extractFiles = factory.match(factory.resolve(extractDestinationDir, "*")); + } + } + if (extractFiles != null && !extractFiles.isEmpty()) { + new GcsUtilFactory().create(options).remove(extractFiles); + } }}; return input.getPipeline() .apply(org.apache.beam.sdk.io.Read.from(source)) @@ -959,7 +978,7 @@ public List> splitIntoBundles( BigQueryOptions bqOptions = options.as(BigQueryOptions.class); TableReference tableToExtract = getTableToExtract(bqOptions); JobService jobService = bqServices.getJobService(bqOptions); - String extractJobId = jobIdToken + "-extract"; + String extractJobId = getExtractJobId(jobIdToken); List tempFiles = executeExtract(extractJobId, tableToExtract, jobService); TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable( @@ -997,40 +1016,23 @@ private List executeExtract( .setProjectId(table.getProjectId()) .setJobId(jobId); - String destinationUri = String.format("%s/%s", extractDestinationDir, "*.avro"); - JobConfigurationExtract extract = new JobConfigurationExtract(); - extract.setSourceTable(table); - extract.setDestinationFormat("AVRO"); - extract.setDestinationUris(ImmutableList.of(destinationUri)); + String destinationUri = getExtractDestinationUri(extractDestinationDir); + JobConfigurationExtract extract = new JobConfigurationExtract() + .setSourceTable(table) + .setDestinationFormat("AVRO") + .setDestinationUris(ImmutableList.of(destinationUri)); LOG.info("Starting BigQuery extract job: {}", jobId); jobService.startExtractJob(jobRef, extract); - Job job = + Job extractJob = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); - if (parseStatus(job) != Status.SUCCEEDED) { + if (parseStatus(extractJob) != Status.SUCCEEDED) { throw new IOException(String.format( "Extract job %s failed, status: %s", - job.getJobReference().getJobId(), job.getStatus())); - } - - JobStatistics jobStats = job.getStatistics(); - List counts = jobStats.getExtract().getDestinationUriFileCounts(); - if (counts.size() != 1) { - String errorMessage = (counts.size() == 0 ? - "No destination uri file count received." : - String.format("More than one destination uri file count received. First two are %s, %s", - counts.get(0), counts.get(1))); - throw new RuntimeException(errorMessage); + extractJob.getJobReference().getJobId(), extractJob.getStatus())); } - long filesCount = counts.get(0); - List tempFiles = Lists.newArrayList(); - IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); - for (int i = 0; i < filesCount; ++i) { - String filePath = - factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro")); - tempFiles.add(filePath); - } + List tempFiles = getExtractFilePaths(extractDestinationDir, extractJob); return ImmutableList.copyOf(tempFiles); } @@ -1210,6 +1212,37 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { } } + private static String getExtractJobId(String jobIdToken) { + return jobIdToken + "-extract"; + } + + private static String getExtractDestinationUri(String extractDestinationDir) { + return String.format("%s/%s", extractDestinationDir, "*.avro"); + } + + private static List getExtractFilePaths(String extractDestinationDir, Job extractJob) + throws IOException { + JobStatistics jobStats = extractJob.getStatistics(); + List counts = jobStats.getExtract().getDestinationUriFileCounts(); + if (counts.size() != 1) { + String errorMessage = (counts.size() == 0 ? + "No destination uri file count received." : + String.format("More than one destination uri file count received. First two are %s, %s", + counts.get(0), counts.get(1))); + throw new RuntimeException(errorMessage); + } + long filesCount = counts.get(0); + + ImmutableList.Builder paths = ImmutableList.builder(); + IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir); + for (long i = 0; i < filesCount; ++i) { + String filePath = + factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro")); + paths.add(filePath); + } + return paths.build(); + } + ///////////////////////////////////////////////////////////////////////////// /** @@ -1868,13 +1901,13 @@ private void load( @Nullable TableSchema schema, WriteDisposition writeDisposition, CreateDisposition createDisposition) throws InterruptedException, IOException { - JobConfigurationLoad loadConfig = new JobConfigurationLoad(); - loadConfig.setSourceUris(gcsUris); - loadConfig.setDestinationTable(ref); - loadConfig.setSchema(schema); - loadConfig.setWriteDisposition(writeDisposition.name()); - loadConfig.setCreateDisposition(createDisposition.name()); - loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad() + .setSourceUris(gcsUris) + .setDestinationTable(ref) + .setSchema(schema) + .setWriteDisposition(writeDisposition.name()) + .setCreateDisposition(createDisposition.name()) + .setSourceFormat("NEWLINE_DELIMITED_JSON"); boolean retrying = false; String projectId = ref.getProjectId(); @@ -2371,7 +2404,9 @@ static T fromJsonString(String json, Class clazz) { } /** - * Returns a randomUUID string without {@code '-'}. + * Returns a randomUUID string. + * + *

{@code '-'} is removed because BigQuery doesn't allow it in dataset id. */ private static String randomUUIDString() { return UUID.randomUUID().toString().replaceAll("-", ""); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java index 2afc731d873c..01ea45f912d8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BigQueryServicesImpl.java @@ -331,15 +331,15 @@ void createDataset( String description, Sleeper sleeper, BackOff backoff) throws IOException, InterruptedException { - DatasetReference datasetRef = new DatasetReference(); - datasetRef.setProjectId(projectId); - datasetRef.setDatasetId(datasetId); - - Dataset dataset = new Dataset(); - dataset.setDatasetReference(datasetRef); - dataset.setLocation(location); - dataset.setFriendlyName(location); - dataset.setDescription(description); + DatasetReference datasetRef = new DatasetReference() + .setProjectId(projectId) + .setDatasetId(datasetId); + + Dataset dataset = new Dataset() + .setDatasetReference(datasetRef) + .setLocation(location) + .setFriendlyName(location) + .setDescription(description); Exception lastException; do { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index acbb80c94c08..d9f53af579ee 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -107,7 +107,6 @@ import java.util.NoSuchElementException; import javax.annotation.Nullable; -import javax.print.attribute.standard.Destination; /** * Tests for BigQueryIO. @@ -478,8 +477,9 @@ public void testBuildSourceWithTableAndFlatten() { @Test public void testReadFromTable() { FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() - .withJobService( - new FakeJobService().startJobReturns("done", "done")) + .withJobService(new FakeJobService() + .startJobReturns("done", "done") + .pollJobReturns(Status.UNKNOWN)) .readerReturns( toJsonString(new TableRow().set("name", "a").set("number", 1)), toJsonString(new TableRow().set("name", "b").set("number", 2)), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java index 3df08c310acd..3ec2b37e7d1a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BigQueryServicesImplTest.java @@ -200,8 +200,10 @@ public void testPollJobSucceeds() throws IOException, InterruptedException { BigQueryServicesImpl.JobServiceImpl jobService = new BigQueryServicesImpl.JobServiceImpl(bigquery); - Job job = - jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); assertEquals(testJob, job); verify(response, times(1)).getStatusCode(); @@ -223,8 +225,10 @@ public void testPollJobFailed() throws IOException, InterruptedException { BigQueryServicesImpl.JobServiceImpl jobService = new BigQueryServicesImpl.JobServiceImpl(bigquery); - Job job = - jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); assertEquals(testJob, job); verify(response, times(1)).getStatusCode(); @@ -246,8 +250,10 @@ public void testPollJobUnknown() throws IOException, InterruptedException { BigQueryServicesImpl.JobServiceImpl jobService = new BigQueryServicesImpl.JobServiceImpl(bigquery); - Job job = - jobService.pollJob("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF); + JobReference jobRef = new JobReference() + .setProjectId("projectId") + .setJobId("jobId"); + Job job = jobService.pollJob(jobRef, Sleeper.DEFAULT, BackOff.STOP_BACKOFF); assertEquals(null, job); verify(response, times(1)).getStatusCode();