Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.io.BigQueryIO.BigQuerySink;
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
Expand All @@ -49,6 +52,7 @@
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.coders.VarLongCoder;
import com.google.cloud.dataflow.sdk.io.AvroIO;
Expand Down Expand Up @@ -228,6 +232,7 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
*/
public static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]+[a-z0-9]";

private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
/**
* Construct a runner from the provided options.
*
Expand Down Expand Up @@ -355,6 +360,10 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
builder.put(View.AsList.class, BatchViewAsList.class);
builder.put(View.AsIterable.class, BatchViewAsIterable.class);
}
if (options.getExperiments() == null
|| !options.getExperiments().contains("enable_custom_bigquery_sink")) {
builder.put(BigQueryIO.Write.Bound.class, BatchBigQueryIOWrite.class);
}
overrides = builder.build();
}
}
Expand Down Expand Up @@ -2059,6 +2068,104 @@ public PDone apply(PCollection<T> input) {
}
}

private static class BatchBigQueryIOWrite extends PTransform<PCollection<TableRow>, PDone> {
private final BigQueryIO.Write.Bound transform;
/**
* Builds an instance of this class from the overridden transform.
*/
@SuppressWarnings("unused") // used via reflection in DataflowPipelineRunner#apply()
public BatchBigQueryIOWrite(DataflowPipelineRunner runner, BigQueryIO.Write.Bound transform) {
this.transform = transform;
}

@Override
public PDone apply(PCollection<TableRow> input) {
if (transform.getTable() == null) {
// BigQueryIO.Write is using tableRefFunction with StreamWithDeDup.
return transform.apply(input);
} else {
return input
.apply(new BatchBigQueryIONativeWrite(transform));
}
}
}

/**
* This {@link PTransform} is used by the {@link DataflowPipelineTranslator} as a way
* to provide the native definition of the BigQuery sink.
*/
private static class BatchBigQueryIONativeWrite extends PTransform<PCollection<TableRow>, PDone> {
Copy link
Contributor

@dhalperi dhalperi Apr 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this replaces BigQueryIOTranslator for write? Remove those?

Copy link
Contributor

@dhalperi dhalperi Apr 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Here and in Beam)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final BigQueryIO.Write.Bound transform;
public BatchBigQueryIONativeWrite(BigQueryIO.Write.Bound transform) {
this.transform = transform;
}

@Override
public PDone apply(PCollection<TableRow> input) {
return PDone.in(input.getPipeline());
}

static {
DataflowPipelineTranslator.registerTransformTranslator(
BatchBigQueryIONativeWrite.class, new BatchBigQueryIONativeWriteTranslator());
}
}

/**
* {@code BigQueryIO.Write.Bound} support code for the Dataflow backend.
*/
private static class BatchBigQueryIONativeWriteTranslator
implements TransformTranslator<BatchBigQueryIONativeWrite> {
@SuppressWarnings("unchecked")
@Override
public void translate(BatchBigQueryIONativeWrite transform,
TranslationContext context) {
translateWriteHelper(transform, transform.transform, context);
}

private void translateWriteHelper(
BatchBigQueryIONativeWrite transform,
BigQueryIO.Write.Bound originalTransform,
TranslationContext context) {
if (context.getPipelineOptions().isStreaming()) {
// Streaming is handled by the streaming runner.
throw new AssertionError(
"BigQueryIO is specified to use streaming write in batch mode.");
}

TableReference table = originalTransform.getTable();

// Actual translation.
context.addStep(transform, "ParallelWrite");
context.addInput(PropertyNames.FORMAT, "bigquery");
context.addInput(PropertyNames.BIGQUERY_TABLE,
table.getTableId());
context.addInput(PropertyNames.BIGQUERY_DATASET,
table.getDatasetId());
if (table.getProjectId() != null) {
context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId());
}
if (originalTransform.getSchema() != null) {
try {
context.addInput(PropertyNames.BIGQUERY_SCHEMA,
JSON_FACTORY.toString(originalTransform.getSchema()));
} catch (IOException exn) {
throw new IllegalArgumentException("Invalid table schema.", exn);
}
}
context.addInput(
PropertyNames.BIGQUERY_CREATE_DISPOSITION,
originalTransform.getCreateDisposition().name());
context.addInput(
PropertyNames.BIGQUERY_WRITE_DISPOSITION,
originalTransform.getWriteDisposition().name());
// Set sink encoding to TableRowJsonCoder.
context.addEncodingInput(
WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of()));
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
}

/**
* Specialized implementation which overrides
* {@link com.google.cloud.dataflow.sdk.io.TextIO.Write.Bound TextIO.Write.Bound} with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1043,8 +1043,6 @@ private <T> void translateHelper(

registerTransformTranslator(
BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator());
registerTransformTranslator(
BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator());

registerTransformTranslator(
PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,18 @@

package com.google.cloud.dataflow.sdk.runners.dataflow;

import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.WindowedValue;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* BigQuery transform support code for the Dataflow backend.
*/
public class BigQueryIOTranslator {
private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOTranslator.class);

/**
Expand Down Expand Up @@ -74,52 +67,4 @@ public void translate(
context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
}

/**
* Implements BigQueryIO Write translation for the Dataflow backend.
*/
public static class WriteTranslator
implements DataflowPipelineTranslator.TransformTranslator<BigQueryIO.Write.Bound> {

@Override
public void translate(BigQueryIO.Write.Bound transform,
DataflowPipelineTranslator.TranslationContext context) {
if (context.getPipelineOptions().isStreaming()) {
// Streaming is handled by the streaming runner.
throw new AssertionError(
"BigQueryIO is specified to use streaming write in batch mode.");
}

TableReference table = transform.getTable();

// Actual translation.
context.addStep(transform, "ParallelWrite");
context.addInput(PropertyNames.FORMAT, "bigquery");
context.addInput(PropertyNames.BIGQUERY_TABLE,
table.getTableId());
context.addInput(PropertyNames.BIGQUERY_DATASET,
table.getDatasetId());
if (table.getProjectId() != null) {
context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId());
}
if (transform.getSchema() != null) {
try {
context.addInput(PropertyNames.BIGQUERY_SCHEMA,
JSON_FACTORY.toString(transform.getSchema()));
} catch (IOException exn) {
throw new IllegalArgumentException("Invalid table schema.", exn);
}
}
context.addInput(
PropertyNames.BIGQUERY_CREATE_DISPOSITION,
transform.getCreateDisposition().name());
context.addInput(
PropertyNames.BIGQUERY_WRITE_DISPOSITION,
transform.getWriteDisposition().name());
// Set sink encoding to TableRowJsonCoder.
context.addEncodingInput(
WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of()));
context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform));
}
}
}