From 08be8eeb610b29d19eaf45c76b859a65cfb6d964 Mon Sep 17 00:00:00 2001 From: Pei He Date: Thu, 14 Apr 2016 23:17:36 -0700 Subject: [PATCH] Override BigQueryIO.Write with native sink for Dataflow runner --- .../cloud/dataflow/sdk/io/BigQueryIO.java | 1 - .../sdk/runners/DataflowPipelineRunner.java | 107 ++++++++++++++++++ .../runners/DataflowPipelineTranslator.java | 2 - .../dataflow/BigQueryIOTranslator.java | 55 --------- 4 files changed, 107 insertions(+), 58 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index f4094e5274..8f291b99f9 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -36,7 +36,6 @@ import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; import com.google.cloud.dataflow.sdk.coders.VoidCoder; -import com.google.cloud.dataflow.sdk.io.BigQueryIO.BigQuerySink; import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition; import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; import com.google.cloud.dataflow.sdk.options.BigQueryOptions; diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index 99bbb257d6..0f0fc477dd 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -23,6 +23,9 @@ import static com.google.common.base.Preconditions.checkState; import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.json.JsonFactory; +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.clouddebugger.v2.Clouddebugger; import com.google.api.services.clouddebugger.v2.model.Debuggee; import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest; @@ -49,6 +52,7 @@ import com.google.cloud.dataflow.sdk.coders.MapCoder; import com.google.cloud.dataflow.sdk.coders.SerializableCoder; import com.google.cloud.dataflow.sdk.coders.StandardCoder; +import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; import com.google.cloud.dataflow.sdk.coders.VarIntCoder; import com.google.cloud.dataflow.sdk.coders.VarLongCoder; import com.google.cloud.dataflow.sdk.io.AvroIO; @@ -228,6 +232,7 @@ public class DataflowPipelineRunner extends PipelineRunner */ public static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]+[a-z0-9]"; + private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); /** * Construct a runner from the provided options. * @@ -355,6 +360,10 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) { builder.put(View.AsList.class, BatchViewAsList.class); builder.put(View.AsIterable.class, BatchViewAsIterable.class); } + if (options.getExperiments() == null + || !options.getExperiments().contains("enable_custom_bigquery_sink")) { + builder.put(BigQueryIO.Write.Bound.class, BatchBigQueryIOWrite.class); + } overrides = builder.build(); } } @@ -2059,6 +2068,104 @@ public PDone apply(PCollection input) { } } + private static class BatchBigQueryIOWrite extends PTransform, PDone> { + private final BigQueryIO.Write.Bound transform; + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply() + public BatchBigQueryIOWrite(DataflowPipelineRunner runner, BigQueryIO.Write.Bound transform) { + this.transform = transform; + } + + @Override + public PDone apply(PCollection input) { + if (transform.getTable() == null) { + // BigQueryIO.Write is using tableRefFunction with StreamWithDeDup. + return transform.apply(input); + } else { + return input + .apply(new BatchBigQueryIONativeWrite(transform)); + } + } + } + + /** + * This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way + * to provide the native definition of the BigQuery sink. + */ + private static class BatchBigQueryIONativeWrite extends PTransform, PDone> { + private final BigQueryIO.Write.Bound transform; + public BatchBigQueryIONativeWrite(BigQueryIO.Write.Bound transform) { + this.transform = transform; + } + + @Override + public PDone apply(PCollection input) { + return PDone.in(input.getPipeline()); + } + + static { + DataflowPipelineTranslator.registerTransformTranslator( + BatchBigQueryIONativeWrite.class, new BatchBigQueryIONativeWriteTranslator()); + } + } + + /** + * {@code BigQueryIO.Write.Bound} support code for the Dataflow backend. + */ + private static class BatchBigQueryIONativeWriteTranslator + implements TransformTranslator { + @SuppressWarnings("unchecked") + @Override + public void translate(BatchBigQueryIONativeWrite transform, + TranslationContext context) { + translateWriteHelper(transform, transform.transform, context); + } + + private void translateWriteHelper( + BatchBigQueryIONativeWrite transform, + BigQueryIO.Write.Bound originalTransform, + TranslationContext context) { + if (context.getPipelineOptions().isStreaming()) { + // Streaming is handled by the streaming runner. + throw new AssertionError( + "BigQueryIO is specified to use streaming write in batch mode."); + } + + TableReference table = originalTransform.getTable(); + + // Actual translation. + context.addStep(transform, "ParallelWrite"); + context.addInput(PropertyNames.FORMAT, "bigquery"); + context.addInput(PropertyNames.BIGQUERY_TABLE, + table.getTableId()); + context.addInput(PropertyNames.BIGQUERY_DATASET, + table.getDatasetId()); + if (table.getProjectId() != null) { + context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId()); + } + if (originalTransform.getSchema() != null) { + try { + context.addInput(PropertyNames.BIGQUERY_SCHEMA, + JSON_FACTORY.toString(originalTransform.getSchema())); + } catch (IOException exn) { + throw new IllegalArgumentException("Invalid table schema.", exn); + } + } + context.addInput( + PropertyNames.BIGQUERY_CREATE_DISPOSITION, + originalTransform.getCreateDisposition().name()); + context.addInput( + PropertyNames.BIGQUERY_WRITE_DISPOSITION, + originalTransform.getWriteDisposition().name()); + // Set sink encoding to TableRowJsonCoder. + context.addEncodingInput( + WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of())); + context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); + } + } + /** * Specialized implementation which overrides * {@link com.google.cloud.dataflow.sdk.io.TextIO.Write.Bound TextIO.Write.Bound} with diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index 155c454b38..75fd5b3f93 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -1043,8 +1043,6 @@ private void translateHelper( registerTransformTranslator( BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator()); - registerTransformTranslator( - BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator()); registerTransformTranslator( PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator()); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java index 538901c722..c327ecf434 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/dataflow/BigQueryIOTranslator.java @@ -16,25 +16,18 @@ package com.google.cloud.dataflow.sdk.runners.dataflow; -import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.model.TableReference; -import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder; import com.google.cloud.dataflow.sdk.io.BigQueryIO; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator; import com.google.cloud.dataflow.sdk.util.PropertyNames; -import com.google.cloud.dataflow.sdk.util.Transport; -import com.google.cloud.dataflow.sdk.util.WindowedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** * BigQuery transform support code for the Dataflow backend. */ public class BigQueryIOTranslator { - private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOTranslator.class); /** @@ -74,52 +67,4 @@ public void translate( context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); } } - - /** - * Implements BigQueryIO Write translation for the Dataflow backend. - */ - public static class WriteTranslator - implements DataflowPipelineTranslator.TransformTranslator { - - @Override - public void translate(BigQueryIO.Write.Bound transform, - DataflowPipelineTranslator.TranslationContext context) { - if (context.getPipelineOptions().isStreaming()) { - // Streaming is handled by the streaming runner. - throw new AssertionError( - "BigQueryIO is specified to use streaming write in batch mode."); - } - - TableReference table = transform.getTable(); - - // Actual translation. - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.FORMAT, "bigquery"); - context.addInput(PropertyNames.BIGQUERY_TABLE, - table.getTableId()); - context.addInput(PropertyNames.BIGQUERY_DATASET, - table.getDatasetId()); - if (table.getProjectId() != null) { - context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId()); - } - if (transform.getSchema() != null) { - try { - context.addInput(PropertyNames.BIGQUERY_SCHEMA, - JSON_FACTORY.toString(transform.getSchema())); - } catch (IOException exn) { - throw new IllegalArgumentException("Invalid table schema.", exn); - } - } - context.addInput( - PropertyNames.BIGQUERY_CREATE_DISPOSITION, - transform.getCreateDisposition().name()); - context.addInput( - PropertyNames.BIGQUERY_WRITE_DISPOSITION, - transform.getWriteDisposition().name()); - // Set sink encoding to TableRowJsonCoder. - context.addEncodingInput( - WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of())); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - } - } }