From b16f518d20953b3d373a52a60148e71180ef9fd0 Mon Sep 17 00:00:00 2001 From: sychen Date: Mon, 7 Dec 2020 09:18:14 -0800 Subject: [PATCH 1/5] Integrate BQ streaming inserts with GroupIntoBatches --- .../gcp/bigquery/BatchedStreamingWrite.java | 402 ++++++++++++++++++ .../sdk/io/gcp/bigquery/StreamingWriteFn.java | 215 ---------- .../io/gcp/bigquery/StreamingWriteTables.java | 141 ++++-- .../sdk/io/gcp/bigquery/TagWithUniqueIds.java | 8 +- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 16 + 5 files changed, 515 insertions(+), 267 deletions(-) create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java new file mode 100644 index 000000000000..a8e3bc73fb11 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.bigquery; + +import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableRow; +import java.io.IOException; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.SinkMetrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.Histogram; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.FailsafeValueInSingleWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** PTransform to perform batched streaming BigQuery write. */ +@SuppressWarnings({ + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) +}) +class BatchedStreamingWrite + extends PTransform>>, PCollection> { + private static final TupleTag mainOutputTag = new TupleTag<>("mainOutput"); + + private final BigQueryServices bqServices; + private final InsertRetryPolicy retryPolicy; + private final TupleTag failedOutputTag; + private final AtomicCoder failedOutputCoder; + private final ErrorContainer errorContainer; + private final boolean skipInvalidRows; + private final boolean ignoreUnknownValues; + private final boolean ignoreInsertIds; + private final SerializableFunction toTableRow; + private final SerializableFunction toFailsafeTableRow; + + /** Tracks histogram of bytes written. Reset at the start of every bundle. */ + private transient Histogram histogram = Histogram.linear(0, 20, 3000); + + private transient Long lastReportedSystemClockMillis = System.currentTimeMillis(); + + private final Logger LOG = LoggerFactory.getLogger(BatchedStreamingWrite.class); + + /** Tracks bytes written, exposed as "ByteCount" Counter. */ + private Counter byteCounter = SinkMetrics.bytesWritten(); + + /** Switches the method of batching. */ + private final boolean batchViaStateful; + + public BatchedStreamingWrite( + BigQueryServices bqServices, + InsertRetryPolicy retryPolicy, + TupleTag failedOutputTag, + AtomicCoder failedOutputCoder, + ErrorContainer errorContainer, + boolean skipInvalidRows, + boolean ignoreUnknownValues, + boolean ignoreInsertIds, + SerializableFunction toTableRow, + SerializableFunction toFailsafeTableRow) { + this.bqServices = bqServices; + this.retryPolicy = retryPolicy; + this.failedOutputTag = failedOutputTag; + this.failedOutputCoder = failedOutputCoder; + this.errorContainer = errorContainer; + this.skipInvalidRows = skipInvalidRows; + this.ignoreUnknownValues = ignoreUnknownValues; + this.ignoreInsertIds = ignoreInsertIds; + this.toTableRow = toTableRow; + this.toFailsafeTableRow = toFailsafeTableRow; + this.batchViaStateful = false; + } + + private BatchedStreamingWrite( + BigQueryServices bqServices, + InsertRetryPolicy retryPolicy, + TupleTag failedOutputTag, + AtomicCoder failedOutputCoder, + ErrorContainer errorContainer, + boolean skipInvalidRows, + boolean ignoreUnknownValues, + boolean ignoreInsertIds, + SerializableFunction toTableRow, + SerializableFunction toFailsafeTableRow, + boolean batchViaStateful) { + this.bqServices = bqServices; + this.retryPolicy = retryPolicy; + this.failedOutputTag = failedOutputTag; + this.failedOutputCoder = failedOutputCoder; + this.errorContainer = errorContainer; + this.skipInvalidRows = skipInvalidRows; + this.ignoreUnknownValues = ignoreUnknownValues; + this.ignoreInsertIds = ignoreInsertIds; + this.toTableRow = toTableRow; + this.toFailsafeTableRow = toFailsafeTableRow; + this.batchViaStateful = batchViaStateful; + } + + /** + * A transform that performs batched streaming BigQuery write; input elements are batched and + * flushed upon bundle finalization. + */ + public BatchedStreamingWrite viaDoFnFinalization() { + return new BatchedStreamingWrite<>( + bqServices, + retryPolicy, + failedOutputTag, + failedOutputCoder, + errorContainer, + skipInvalidRows, + ignoreUnknownValues, + ignoreInsertIds, + toTableRow, + toFailsafeTableRow, + false); + } + + /** + * A transform that performs batched streaming BigQuery write; input elements are grouped on table + * destinations and batched via a stateful DoFn. This also enables dynamic sharding during + * grouping to parallelize writes. + */ + public BatchedStreamingWrite viaStateful() { + return new BatchedStreamingWrite<>( + bqServices, + retryPolicy, + failedOutputTag, + failedOutputCoder, + errorContainer, + skipInvalidRows, + ignoreUnknownValues, + ignoreInsertIds, + toTableRow, + toFailsafeTableRow, + true); + } + + @Override + public PCollection expand(PCollection>> input) { + return batchViaStateful + ? input.apply(new ViaStateful()) + : input.apply(new ViaBundleFinalization()); + } + + private class ViaBundleFinalization + extends PTransform>>, PCollection> { + @Override + public PCollection expand(PCollection>> input) { + PCollectionTuple result = + input.apply( + ParDo.of(new BatchAndInsertElements()) + .withOutputTags(mainOutputTag, TupleTagList.of(failedOutputTag))); + PCollection failedInserts = result.get(failedOutputTag); + failedInserts.setCoder(failedOutputCoder); + return failedInserts; + } + } + + @VisibleForTesting + private class BatchAndInsertElements extends DoFn>, Void> { + + /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */ + private transient Map>> tableRows; + + /** The list of unique ids for each BigQuery table row. */ + private transient Map> uniqueIdsForTableRows; + + @Setup + public void setup() { + // record latency upto 60 seconds in the resolution of 20ms + histogram = Histogram.linear(0, 20, 3000); + lastReportedSystemClockMillis = System.currentTimeMillis(); + } + + @Teardown + public void teardown() { + if (histogram.getTotalCount() > 0) { + logPercentiles(); + histogram.clear(); + } + } + + /** Prepares a target BigQuery table. */ + @StartBundle + public void startBundle() { + tableRows = new HashMap<>(); + uniqueIdsForTableRows = new HashMap<>(); + } + + /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */ + @ProcessElement + public void processElement( + @Element KV> element, + @Timestamp Instant timestamp, + BoundedWindow window, + PaneInfo pane) { + String tableSpec = element.getKey(); + List> rows = + BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec); + List uniqueIds = + BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, tableSpec); + + TableRow tableRow = toTableRow.apply(element.getValue().tableRow); + TableRow failsafeTableRow = toFailsafeTableRow.apply(element.getValue().tableRow); + rows.add(FailsafeValueInSingleWindow.of(tableRow, timestamp, window, pane, failsafeTableRow)); + uniqueIds.add(element.getValue().uniqueId); + } + + /** Writes the accumulated rows into BigQuery with streaming API. */ + @FinishBundle + public void finishBundle(FinishBundleContext context) throws Exception { + List> failedInserts = Lists.newArrayList(); + BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); + for (Map.Entry>> entry : + tableRows.entrySet()) { + TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey()); + flushRows( + tableReference, + entry.getValue(), + uniqueIdsForTableRows.get(entry.getKey()), + options, + failedInserts); + } + tableRows.clear(); + uniqueIdsForTableRows.clear(); + + for (ValueInSingleWindow row : failedInserts) { + context.output(failedOutputTag, row.getValue(), row.getTimestamp(), row.getWindow()); + } + + updateAndLogHistogram(options); + } + } + + private class ViaStateful + extends PTransform>>, PCollection> { + private final Duration BATCH_MAX_BUFFERING_DURATION = Duration.millis(200); + + @Override + public PCollection expand(PCollection>> input) { + BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); + KvCoder> inputCoder = (KvCoder) input.getCoder(); + TableRowInfoCoder valueCoder = + (TableRowInfoCoder) inputCoder.getCoderArguments().get(1); + PCollectionTuple result = + input + // Group and batch table rows such that each batch has no more than + // getMaxStreamingRowsToBatch rows. Also set a buffering time limit to avoid being + // stuck at a partial batch forever, especially in a global window. + .apply( + GroupIntoBatches.>ofSize( + options.getMaxStreamingRowsToBatch()) + .withMaxBufferingDuration(BATCH_MAX_BUFFERING_DURATION) + .withShardedKey()) + .setCoder( + KvCoder.of( + ShardedKey.Coder.of(StringUtf8Coder.of()), IterableCoder.of(valueCoder))) + .apply( + ParDo.of(new InsertBatchedElements()) + .withOutputTags(mainOutputTag, TupleTagList.of(failedOutputTag))); + PCollection failedInserts = result.get(failedOutputTag); + failedInserts.setCoder(failedOutputCoder); + return failedInserts; + } + } + + private class InsertBatchedElements + extends DoFn, Iterable>>, Void> { + @Setup + public void setup() { + // record latency upto 60 seconds in the resolution of 20ms + histogram = Histogram.linear(0, 20, 3000); + lastReportedSystemClockMillis = System.currentTimeMillis(); + } + + @Teardown + public void teardown() { + if (histogram.getTotalCount() > 0) { + logPercentiles(); + histogram.clear(); + } + } + + @ProcessElement + public void processElement( + @Element KV, Iterable>> input, + BoundedWindow window, + ProcessContext context) + throws InterruptedException { + List> tableRows = new ArrayList<>(); + List uniqueIds = new ArrayList<>(); + for (TableRowInfo row : input.getValue()) { + TableRow tableRow = toTableRow.apply(row.tableRow); + TableRow failsafeTableRow = toFailsafeTableRow.apply(row.tableRow); + tableRows.add( + FailsafeValueInSingleWindow.of( + tableRow, context.timestamp(), window, context.pane(), failsafeTableRow)); + uniqueIds.add(row.uniqueId); + } + BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); + TableReference tableReference = BigQueryHelpers.parseTableSpec(input.getKey().getKey()); + List> failedInserts = Lists.newArrayList(); + flushRows(tableReference, tableRows, uniqueIds, options, failedInserts); + + for (ValueInSingleWindow row : failedInserts) { + context.output(failedOutputTag, row.getValue()); + } + + updateAndLogHistogram(options); + } + } + + private void updateAndLogHistogram(BigQueryOptions options) { + long currentTimeMillis = System.currentTimeMillis(); + if (histogram.getTotalCount() > 0 + && (currentTimeMillis - lastReportedSystemClockMillis) + > options.getLatencyLoggingFrequency() * 1000L) { + logPercentiles(); + histogram.clear(); + lastReportedSystemClockMillis = currentTimeMillis; + } + } + + private void logPercentiles() { + LOG.info( + "Total number of streaming insert requests: {}, P99: {}ms, P90: {}ms, P50: {}ms", + histogram.getTotalCount(), + DoubleMath.roundToInt(histogram.p99(), RoundingMode.HALF_UP), + DoubleMath.roundToInt(histogram.p90(), RoundingMode.HALF_UP), + DoubleMath.roundToInt(histogram.p50(), RoundingMode.HALF_UP)); + } + + /** Writes the accumulated rows into BigQuery with streaming API. */ + private void flushRows( + TableReference tableReference, + List> tableRows, + List uniqueIds, + BigQueryOptions options, + List> failedInserts) + throws InterruptedException { + if (!tableRows.isEmpty()) { + try { + long totalBytes = + bqServices + .getDatasetService(options, histogram) + .insertAll( + tableReference, + tableRows, + uniqueIds, + retryPolicy, + failedInserts, + errorContainer, + skipInvalidRows, + ignoreUnknownValues, + ignoreInsertIds); + byteCounter.inc(totalBytes); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java deleted file mode 100644 index e018a3ca3954..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteFn.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.bigquery; - -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableRow; -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; -import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName; -import org.apache.beam.sdk.metrics.Counter; -import org.apache.beam.sdk.metrics.MetricName; -import org.apache.beam.sdk.metrics.MetricsContainer; -import org.apache.beam.sdk.metrics.MetricsEnvironment; -import org.apache.beam.sdk.metrics.MetricsLogger; -import org.apache.beam.sdk.metrics.SinkMetrics; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.values.FailsafeValueInSingleWindow; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.ShardedKey; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.ValueInSingleWindow; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.joda.time.Instant; - -/** Implementation of DoFn to perform streaming BigQuery write. */ -@SystemDoFnInternal -@VisibleForTesting -@SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) -}) -class StreamingWriteFn - extends DoFn, TableRowInfo>, Void> { - private final BigQueryServices bqServices; - private final InsertRetryPolicy retryPolicy; - private final TupleTag failedOutputTag; - private final ErrorContainer errorContainer; - private final boolean skipInvalidRows; - private final boolean ignoreUnknownValues; - private final boolean ignoreInsertIds; - private final SerializableFunction toTableRow; - private final SerializableFunction toFailsafeTableRow; - private final Set metricFilter; - - /** JsonTableRows to accumulate BigQuery rows in order to batch writes. */ - private transient Map>> tableRows; - - /** The list of unique ids for each BigQuery table row. */ - private transient Map> uniqueIdsForTableRows; - - /** Tracks bytes written, exposed as "ByteCount" Counter. */ - private Counter byteCounter = SinkMetrics.bytesWritten(); - - StreamingWriteFn( - BigQueryServices bqServices, - InsertRetryPolicy retryPolicy, - TupleTag failedOutputTag, - ErrorContainer errorContainer, - boolean skipInvalidRows, - boolean ignoreUnknownValues, - boolean ignoreInsertIds, - SerializableFunction toTableRow, - SerializableFunction toFailsafeTableRow) { - this.bqServices = bqServices; - this.retryPolicy = retryPolicy; - this.failedOutputTag = failedOutputTag; - this.errorContainer = errorContainer; - this.skipInvalidRows = skipInvalidRows; - this.ignoreUnknownValues = ignoreUnknownValues; - this.ignoreInsertIds = ignoreInsertIds; - this.toTableRow = toTableRow; - this.toFailsafeTableRow = toFailsafeTableRow; - ImmutableSet.Builder setBuilder = ImmutableSet.builder(); - setBuilder.add( - MonitoringInfoMetricName.named( - MonitoringInfoConstants.Urns.API_REQUEST_LATENCIES, - BigQueryServicesImpl.API_METRIC_LABEL)); - for (String status : BigQueryServicesImpl.DatasetServiceImpl.CANONICAL_STATUS_MAP.values()) { - setBuilder.add( - MonitoringInfoMetricName.named( - MonitoringInfoConstants.Urns.API_REQUEST_COUNT, - ImmutableMap.builder() - .putAll(BigQueryServicesImpl.API_METRIC_LABEL) - .put(MonitoringInfoConstants.Labels.STATUS, status) - .build())); - } - setBuilder.add( - MonitoringInfoMetricName.named( - MonitoringInfoConstants.Urns.API_REQUEST_COUNT, - ImmutableMap.builder() - .putAll(BigQueryServicesImpl.API_METRIC_LABEL) - .put( - MonitoringInfoConstants.Labels.STATUS, - BigQueryServicesImpl.DatasetServiceImpl.CANONICAL_STATUS_UNKNOWN) - .build())); - this.metricFilter = setBuilder.build(); - } - - /** Prepares a target BigQuery table. */ - @StartBundle - public void startBundle() { - tableRows = new HashMap<>(); - uniqueIdsForTableRows = new HashMap<>(); - } - - /** Accumulates the input into JsonTableRows and uniqueIdsForTableRows. */ - @ProcessElement - public void processElement( - @Element KV, TableRowInfo> element, - @Timestamp Instant timestamp, - BoundedWindow window, - PaneInfo pane) { - String tableSpec = element.getKey().getKey(); - List> rows = - BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec); - List uniqueIds = - BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, tableSpec); - - TableRow tableRow = toTableRow.apply(element.getValue().tableRow); - TableRow failsafeTableRow = toFailsafeTableRow.apply(element.getValue().tableRow); - rows.add(FailsafeValueInSingleWindow.of(tableRow, timestamp, window, pane, failsafeTableRow)); - uniqueIds.add(element.getValue().uniqueId); - } - - /** Writes the accumulated rows into BigQuery with streaming API. */ - @FinishBundle - public void finishBundle(FinishBundleContext context) throws Exception { - List> failedInserts = Lists.newArrayList(); - BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class); - for (Map.Entry>> entry : - tableRows.entrySet()) { - TableReference tableReference = BigQueryHelpers.parseTableSpec(entry.getKey()); - flushRows( - tableReference, - entry.getValue(), - uniqueIdsForTableRows.get(entry.getKey()), - options, - failedInserts); - } - tableRows.clear(); - uniqueIdsForTableRows.clear(); - - for (ValueInSingleWindow row : failedInserts) { - context.output(failedOutputTag, row.getValue(), row.getTimestamp(), row.getWindow()); - } - reportStreamingApiLogging(options); - } - - private void reportStreamingApiLogging(BigQueryOptions options) { - MetricsContainer processWideContainer = MetricsEnvironment.getProcessWideContainer(); - if (processWideContainer instanceof MetricsLogger) { - MetricsLogger processWideMetricsLogger = (MetricsLogger) processWideContainer; - processWideMetricsLogger.tryLoggingMetrics( - "BigQuery HTTP API Metrics: \n", - metricFilter, - options.getBqStreamingApiLoggingFrequencySec() * 1000L, - true); - } - } - - /** Writes the accumulated rows into BigQuery with streaming API. */ - private void flushRows( - TableReference tableReference, - List> tableRows, - List uniqueIds, - BigQueryOptions options, - List> failedInserts) - throws InterruptedException { - if (!tableRows.isEmpty()) { - try { - long totalBytes = - bqServices - .getDatasetService(options) - .insertAll( - tableReference, - tableRows, - uniqueIds, - retryPolicy, - failedInserts, - errorContainer, - skipInvalidRows, - ignoreUnknownValues, - ignoreInsertIds); - byteCounter.inc(totalBytes); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java index 6afe09331c69..d28a544f0fe1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java @@ -23,19 +23,19 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.ShardedKeyCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; /** * This transform takes in key-value pairs of {@link TableRow} entries and the {@link @@ -243,7 +243,6 @@ private PCollection writeAndGetErrors( AtomicCoder coder, ErrorContainer errorContainer) { BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); - int numShards = options.getNumStreamingKeys(); // A naive implementation would be to simply stream data directly to BigQuery. // However, this could occasionally lead to duplicated data, e.g., when @@ -251,53 +250,101 @@ private PCollection writeAndGetErrors( // The above risk is mitigated in this implementation by relying on // BigQuery built-in best effort de-dup mechanism. - // To use this mechanism, each input TableRow is tagged with a generated - // unique id, which is then passed to BigQuery and used to ignore duplicates - // We create 50 keys per BigQuery table to generate output on. This is few enough that we - // get good batching into BigQuery's insert calls, and enough that we can max out the - // streaming insert quota. - PCollection, TableRowInfo>> tagged = - input - .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable<>(numShards))) - .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), elementCoder)) - .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds<>())) - .setCoder( - KvCoder.of( - ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of(elementCoder))); - - TupleTag mainOutputTag = new TupleTag<>("mainOutput"); + // unique id, which is then passed to BigQuery and used to ignore duplicates. // To prevent having the same TableRow processed more than once with regenerated // different unique ids, this implementation relies on "checkpointing", which is - // achieved as a side effect of having StreamingWriteFn immediately follow a GBK, - // performed by Reshuffle. - PCollectionTuple tuple = - tagged - .apply(Reshuffle.of()) - // Put in the global window to ensure that DynamicDestinations side inputs are accessed - // correctly. - .apply( - "GlobalWindow", - Window., TableRowInfo>>into(new GlobalWindows()) - .triggering(DefaultTrigger.of()) - .discardingFiredPanes()) - .apply( - "StreamingWrite", - ParDo.of( - new StreamingWriteFn<>( - bigQueryServices, - retryPolicy, - failedInsertsTag, - errorContainer, - skipInvalidRows, - ignoreUnknownValues, - ignoreInsertIds, - toTableRow, - toFailsafeTableRow)) - .withOutputTags(mainOutputTag, TupleTagList.of(failedInsertsTag))); - PCollection failedInserts = tuple.get(failedInsertsTag); - failedInserts.setCoder(coder); - return failedInserts; + // achieved as a side effect of having BigQuery insertion immediately follow a GBK. + + if (options.getEnableStreamingAutoSharding()) { + // If runner determined dynamic sharding is enabled, group TableRows on table destinations + // that may be sharded during the runtime. Otherwise, we choose a fixed number of shards per + // table destination following the logic below in the other branch. + PCollection>> unshardedTagged = + input + .apply( + "MapToTableSpec", + MapElements.via( + new SimpleFunction, KV>() { + @Override + public KV apply(KV input) { + return KV.of(input.getKey().getTableSpec(), input.getValue()); + } + })) + .setCoder(KvCoder.of(StringUtf8Coder.of(), elementCoder)) + .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds<>())) + .setCoder(KvCoder.of(StringUtf8Coder.of(), TableRowInfoCoder.of(elementCoder))); + + // Auto-sharding is achieved via GroupIntoBatches.WithShardedKey transform which groups and at + // the same time batches the TableRows to be inserted to BigQuery. + return unshardedTagged.apply( + "StreamingWrite", + new BatchedStreamingWrite<>( + bigQueryServices, + retryPolicy, + failedInsertsTag, + coder, + errorContainer, + skipInvalidRows, + ignoreUnknownValues, + ignoreInsertIds, + toTableRow, + toFailsafeTableRow) + .viaStateful()); + } else { + // We create 50 keys per BigQuery table to generate output on. This is few enough that we + // get good batching into BigQuery's insert calls, and enough that we can max out the + // streaming insert quota. + int numShards = options.getNumStreamingKeys(); + PCollection, TableRowInfo>> shardedTagged = + input + .apply("ShardTableWrites", ParDo.of(new GenerateShardedTable<>(numShards))) + .setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), elementCoder)) + .apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds<>())) + .setCoder( + KvCoder.of( + ShardedKeyCoder.of(StringUtf8Coder.of()), + TableRowInfoCoder.of(elementCoder))); + + return shardedTagged + .apply(Reshuffle.of()) + // Put in the global window to ensure that DynamicDestinations side inputs are accessed + // correctly. + .apply( + "GlobalWindow", + Window., TableRowInfo>>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()) + .apply( + "StripShardId", + MapElements.via( + new SimpleFunction< + KV, TableRowInfo>, + KV>>() { + @Override + public KV> apply( + KV, TableRowInfo> input) { + return KV.of(input.getKey().getKey(), input.getValue()); + } + })) + .setCoder(KvCoder.of(StringUtf8Coder.of(), TableRowInfoCoder.of(elementCoder))) + // Also batch the TableRows in a best effort manner via bundle finalization before + // inserting to BigQuery. + .apply( + "StreamingWrite", + new BatchedStreamingWrite<>( + bigQueryServices, + retryPolicy, + failedInsertsTag, + coder, + errorContainer, + skipInvalidRows, + ignoreUnknownValues, + ignoreInsertIds, + toTableRow, + toFailsafeTableRow) + .viaDoFnFinalization()); + } } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java index 40e9b5fa7b85..bd4e230e07ca 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TagWithUniqueIds.java @@ -20,9 +20,7 @@ import java.io.IOException; import java.util.UUID; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.ShardedKey; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; /** @@ -35,8 +33,8 @@ @SuppressWarnings({ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) -class TagWithUniqueIds - extends DoFn, ElementT>, KV, TableRowInfo>> { +class TagWithUniqueIds + extends DoFn, KV>> { private transient String randomUUID; private transient long sequenceNo = 0L; @@ -47,7 +45,7 @@ public void startBundle() { /** Tag the input with a unique id. */ @ProcessElement - public void processElement(ProcessContext context, BoundedWindow window) throws IOException { + public void processElement(ProcessContext context) throws IOException { String uniqueId = randomUUID + sequenceNo++; // We output on keys 0-50 to ensure that there's enough batching for // BigQuery. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 47947870377b..26340c01f2c1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -242,6 +242,12 @@ public void testWriteDynamicDestinationsStreamingWithSchemas() throws Exception writeDynamicDestinations(true, true); } + @Test + public void testWriteDynamicDestinationsStreamingWithAutoSharding() throws Exception { + options.as(BigQueryOptions.class).setEnableStreamingAutoSharding(true); + writeDynamicDestinations(true, true); + } + public void writeDynamicDestinations(boolean streaming, boolean schemas) throws Exception { final Schema schema = Schema.builder().addField("name", FieldType.STRING).addField("id", FieldType.INT32).build(); @@ -860,6 +866,16 @@ protected void writeString(org.apache.avro.Schema schema, Object datum, Encoder @Test public void testStreamingWrite() throws Exception { + streamingWrite(); + } + + @Test + public void testStreamingWriteWithAutoSharding() throws Exception { + options.as(BigQueryOptions.class).setEnableStreamingAutoSharding(true); + streamingWrite(); + } + + private void streamingWrite() throws Exception { p.apply( Create.of( new TableRow().set("name", "a").set("number", 1), From 129176a8ff1eb1171ebdc950931a670babe690d8 Mon Sep 17 00:00:00 2001 From: sychen Date: Thu, 7 Jan 2021 17:45:13 -0800 Subject: [PATCH 2/5] Moved autosharding option from BigQueryOption to BigQueryIOBuilder; addressed comments. --- .../gcp/bigquery/BatchedStreamingWrite.java | 29 +++++--- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 23 +++++++ .../sdk/io/gcp/bigquery/StreamingInserts.java | 29 ++++++++ .../io/gcp/bigquery/StreamingWriteTables.java | 29 +++++++- .../io/gcp/bigquery/BigQueryIOWriteTest.java | 69 ++++++++++++------- 5 files changed, 145 insertions(+), 34 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java index a8e3bc73fb11..19f4c33441e0 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java @@ -37,7 +37,10 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.Histogram; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.values.FailsafeValueInSingleWindow; @@ -235,15 +238,14 @@ public void processElement( BoundedWindow window, PaneInfo pane) { String tableSpec = element.getKey(); - List> rows = - BigQueryHelpers.getOrCreateMapListValue(tableRows, tableSpec); - List uniqueIds = - BigQueryHelpers.getOrCreateMapListValue(uniqueIdsForTableRows, tableSpec); - TableRow tableRow = toTableRow.apply(element.getValue().tableRow); TableRow failsafeTableRow = toFailsafeTableRow.apply(element.getValue().tableRow); - rows.add(FailsafeValueInSingleWindow.of(tableRow, timestamp, window, pane, failsafeTableRow)); - uniqueIds.add(element.getValue().uniqueId); + tableRows + .computeIfAbsent(tableSpec, k -> new ArrayList<>()) + .add(FailsafeValueInSingleWindow.of(tableRow, timestamp, window, pane, failsafeTableRow)); + uniqueIdsForTableRows + .computeIfAbsent(tableSpec, k -> new ArrayList<>()) + .add(element.getValue().uniqueId); } /** Writes the accumulated rows into BigQuery with streaming API. */ @@ -284,6 +286,12 @@ public PCollection expand(PCollection> (TableRowInfoCoder) inputCoder.getCoderArguments().get(1); PCollectionTuple result = input + // Apply a global window to avoid GroupIntoBatches below performs tiny grouping + // partitioned by windows. + .apply( + Window.>>into(new GlobalWindows()) + .triggering(DefaultTrigger.of()) + .discardingFiredPanes()) // Group and batch table rows such that each batch has no more than // getMaxStreamingRowsToBatch rows. Also set a buffering time limit to avoid being // stuck at a partial batch forever, especially in a global window. @@ -308,7 +316,7 @@ private class InsertBatchedElements extends DoFn, Iterable>>, Void> { @Setup public void setup() { - // record latency upto 60 seconds in the resolution of 20ms + // record latency up to 60 seconds in the resolution of 20ms histogram = Histogram.linear(0, 20, 3000); lastReportedSystemClockMillis = System.currentTimeMillis(); } @@ -325,7 +333,8 @@ public void teardown() { public void processElement( @Element KV, Iterable>> input, BoundedWindow window, - ProcessContext context) + ProcessContext context, + MultiOutputReceiver out) throws InterruptedException { List> tableRows = new ArrayList<>(); List uniqueIds = new ArrayList<>(); @@ -343,7 +352,7 @@ public void processElement( flushRows(tableReference, tableRows, uniqueIds, options, failedInserts); for (ValueInSingleWindow row : failedInserts) { - context.output(failedOutputTag, row.getValue()); + out.get(failedOutputTag).output(row.getValue()); } updateAndLogHistogram(options); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 2a324f17fde0..1869c91f62af 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1664,6 +1664,7 @@ public static Write write() { .setMaxBytesPerPartition(BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION) .setOptimizeWrites(false) .setUseBeamSchema(false) + .setAutoSharding(false) .build(); } @@ -1789,6 +1790,9 @@ public enum Method { @Experimental(Kind.SCHEMAS) abstract Boolean getUseBeamSchema(); + @Experimental + abstract Boolean getAutoSharding(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -1868,6 +1872,9 @@ abstract Builder setAvroSchemaFactory( @Experimental(Kind.SCHEMAS) abstract Builder setUseBeamSchema(Boolean useBeamSchema); + @Experimental + abstract Builder setAutoSharding(Boolean autoSharding); + abstract Write build(); } @@ -2342,6 +2349,17 @@ public Write useBeamSchema() { return toBuilder().setUseBeamSchema(true).build(); } + /** + * If true, enables dynamically determined number of shards to write to BigQuery. Only + * applicable to unbounded data with STREAMING_INSERTS. + * + *

TODO(BEAM-11408): Also integrate this option to FILE_LOADS. + */ + @Experimental + public Write withAutoSharding() { + return toBuilder().setAutoSharding(true).build(); + } + @VisibleForTesting /** This method is for test usage only */ public Write withTestServices(BigQueryServices testServices) { @@ -2487,6 +2505,10 @@ public WriteResult expand(PCollection input) { method); } + if (input.isBounded() == IsBounded.BOUNDED) { + checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input."); + } + if (getJsonTimePartitioning() != null) { checkArgument( getDynamicDestinations() == null, @@ -2681,6 +2703,7 @@ private WriteResult continueExpandTyped( .withSkipInvalidRows(getSkipInvalidRows()) .withIgnoreUnknownValues(getIgnoreUnknownValues()) .withIgnoreInsertIds(getIgnoreInsertIds()) + .withAutoSharding(getAutoSharding()) .withKmsKey(getKmsKey()); return input.apply(streamingInserts); } else { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java index 149efa3ce1ee..1e4150a1436b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingInserts.java @@ -42,6 +42,7 @@ public class StreamingInserts private final boolean skipInvalidRows; private final boolean ignoreUnknownValues; private final boolean ignoreInsertIds; + private final boolean autoSharding; private final String kmsKey; private final Coder elementCoder; private final SerializableFunction toTableRow; @@ -63,6 +64,7 @@ public StreamingInserts( false, false, false, + false, elementCoder, toTableRow, toFailsafeTableRow, @@ -79,6 +81,7 @@ private StreamingInserts( boolean skipInvalidRows, boolean ignoreUnknownValues, boolean ignoreInsertIds, + boolean autoSharding, Coder elementCoder, SerializableFunction toTableRow, SerializableFunction toFailsafeTableRow, @@ -91,6 +94,7 @@ private StreamingInserts( this.skipInvalidRows = skipInvalidRows; this.ignoreUnknownValues = ignoreUnknownValues; this.ignoreInsertIds = ignoreInsertIds; + this.autoSharding = autoSharding; this.elementCoder = elementCoder; this.toTableRow = toTableRow; this.toFailsafeTableRow = toFailsafeTableRow; @@ -109,6 +113,7 @@ public StreamingInserts withInsertRetryPolicy( skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow, @@ -126,6 +131,7 @@ public StreamingInserts withExtendedErrorInfo(boolean ex skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow, @@ -142,6 +148,7 @@ StreamingInserts withSkipInvalidRows(boolean skipInvalid skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow, @@ -158,6 +165,7 @@ StreamingInserts withIgnoreUnknownValues(boolean ignoreU skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow, @@ -174,6 +182,24 @@ StreamingInserts withIgnoreInsertIds(boolean ignoreInser skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, + elementCoder, + toTableRow, + toFailsafeTableRow, + kmsKey); + } + + StreamingInserts withAutoSharding(boolean autoSharding) { + return new StreamingInserts<>( + createDisposition, + dynamicDestinations, + bigQueryServices, + retryPolicy, + extendedErrorInfo, + skipInvalidRows, + ignoreUnknownValues, + ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow, @@ -190,6 +216,7 @@ StreamingInserts withKmsKey(String kmsKey) { skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow, @@ -206,6 +233,7 @@ StreamingInserts withTestServices(BigQueryServices bigQu skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow, @@ -229,6 +257,7 @@ public WriteResult expand(PCollection> input) { .withSkipInvalidRows(skipInvalidRows) .withIgnoreUnknownValues(ignoreUnknownValues) .withIgnoreInsertIds(ignoreInsertIds) + .withAutoSharding(autoSharding) .withElementCoder(elementCoder) .withToTableRow(toTableRow) .withToFailsafeTableRow(toFailsafeTableRow)); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java index d28a544f0fe1..9bfe3d2197e6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java @@ -57,6 +57,7 @@ public class StreamingWriteTables private final boolean skipInvalidRows; private final boolean ignoreUnknownValues; private final boolean ignoreInsertIds; + private final boolean autoSharding; private final Coder elementCoder; private final SerializableFunction toTableRow; private final SerializableFunction toFailsafeTableRow; @@ -69,6 +70,7 @@ public StreamingWriteTables() { false, // skipInvalidRows false, // ignoreUnknownValues false, // ignoreInsertIds + false, // autoSharding null, // elementCoder null, // toTableRow null); // toFailsafeTableRow @@ -81,6 +83,7 @@ private StreamingWriteTables( boolean skipInvalidRows, boolean ignoreUnknownValues, boolean ignoreInsertIds, + boolean autoSharding, Coder elementCoder, SerializableFunction toTableRow, SerializableFunction toFailsafeTableRow) { @@ -90,6 +93,7 @@ private StreamingWriteTables( this.skipInvalidRows = skipInvalidRows; this.ignoreUnknownValues = ignoreUnknownValues; this.ignoreInsertIds = ignoreInsertIds; + this.autoSharding = autoSharding; this.elementCoder = elementCoder; this.toTableRow = toTableRow; this.toFailsafeTableRow = toFailsafeTableRow; @@ -103,6 +107,7 @@ StreamingWriteTables withTestServices(BigQueryServices bigQueryService skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow); @@ -116,6 +121,7 @@ StreamingWriteTables withInsertRetryPolicy(InsertRetryPolicy retryPoli skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow); @@ -129,6 +135,7 @@ StreamingWriteTables withExtendedErrorInfo(boolean extendedErrorInfo) skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow); @@ -142,6 +149,7 @@ StreamingWriteTables withSkipInvalidRows(boolean skipInvalidRows) { skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow); @@ -155,6 +163,7 @@ StreamingWriteTables withIgnoreUnknownValues(boolean ignoreUnknownValu skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow); @@ -168,6 +177,21 @@ StreamingWriteTables withIgnoreInsertIds(boolean ignoreInsertIds) { skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, + elementCoder, + toTableRow, + toFailsafeTableRow); + } + + StreamingWriteTables withAutoSharding(boolean autoSharding) { + return new StreamingWriteTables<>( + bigQueryServices, + retryPolicy, + extendedErrorInfo, + skipInvalidRows, + ignoreUnknownValues, + ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow); @@ -181,6 +205,7 @@ StreamingWriteTables withElementCoder(Coder elementCoder) { skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow); @@ -195,6 +220,7 @@ StreamingWriteTables withToTableRow( skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow); @@ -209,6 +235,7 @@ StreamingWriteTables withToFailsafeTableRow( skipInvalidRows, ignoreUnknownValues, ignoreInsertIds, + autoSharding, elementCoder, toTableRow, toFailsafeTableRow); @@ -257,7 +284,7 @@ private PCollection writeAndGetErrors( // different unique ids, this implementation relies on "checkpointing", which is // achieved as a side effect of having BigQuery insertion immediately follow a GBK. - if (options.getEnableStreamingAutoSharding()) { + if (autoSharding) { // If runner determined dynamic sharding is enabled, group TableRows on table destinations // that may be sharded during the runtime. Otherwise, we choose a fixed number of shards per // table destination following the logic below in the other branch. diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 26340c01f2c1..d327eced3a1b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -224,31 +224,31 @@ public void testWriteEmptyPCollection() throws Exception { @Test public void testWriteDynamicDestinationsBatch() throws Exception { - writeDynamicDestinations(false, false); + writeDynamicDestinations(false, false, false); } @Test public void testWriteDynamicDestinationsBatchWithSchemas() throws Exception { - writeDynamicDestinations(false, true); + writeDynamicDestinations(false, true, false); } @Test public void testWriteDynamicDestinationsStreaming() throws Exception { - writeDynamicDestinations(true, false); + writeDynamicDestinations(true, false, false); } @Test public void testWriteDynamicDestinationsStreamingWithSchemas() throws Exception { - writeDynamicDestinations(true, true); + writeDynamicDestinations(true, true, false); } @Test public void testWriteDynamicDestinationsStreamingWithAutoSharding() throws Exception { - options.as(BigQueryOptions.class).setEnableStreamingAutoSharding(true); - writeDynamicDestinations(true, true); + writeDynamicDestinations(true, true, true); } - public void writeDynamicDestinations(boolean streaming, boolean schemas) throws Exception { + public void writeDynamicDestinations(boolean streaming, boolean schemas, boolean autoSharding) + throws Exception { final Schema schema = Schema.builder().addField("name", FieldType.STRING).addField("id", FieldType.INT32).build(); @@ -364,6 +364,9 @@ private void verifySideInputs() { return new TableRow().set("name", matcher.group(1)).set("id", matcher.group(2)); }); } + if (autoSharding) { + write = write.withAutoSharding(); + } users.apply("WriteBigQuery", write); p.run(); @@ -866,16 +869,30 @@ protected void writeString(org.apache.avro.Schema schema, Object datum, Encoder @Test public void testStreamingWrite() throws Exception { - streamingWrite(); + streamingWrite(false); } @Test public void testStreamingWriteWithAutoSharding() throws Exception { - options.as(BigQueryOptions.class).setEnableStreamingAutoSharding(true); - streamingWrite(); + streamingWrite(true); } - private void streamingWrite() throws Exception { + private void streamingWrite(boolean autoSharding) throws Exception { + BigQueryIO.Write write = + BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table-id") + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) + .withSchema( + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation(); + if (autoSharding) { + write = write.withAutoSharding(); + } p.apply( Create.of( new TableRow().set("name", "a").set("number", 1), @@ -884,18 +901,7 @@ private void streamingWrite() throws Exception { new TableRow().set("name", "d").set("number", 4)) .withCoder(TableRowJsonCoder.of())) .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) - .apply( - BigQueryIO.writeTableRows() - .to("project-id:dataset-id.table-id") - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withSchema( - new TableSchema() - .setFields( - ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("number").setType("INTEGER")))) - .withTestServices(fakeBqServices) - .withoutValidation()); + .apply(write); p.run(); assertThat( @@ -1264,6 +1270,7 @@ public void testWriteBuilderMethods() { assertEquals(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY, write.getWriteDisposition()); assertEquals(null, write.getTableDescription()); assertTrue(write.getValidate()); + assertFalse(write.getAutoSharding()); assertFalse(write.withoutValidation().getValidate()); TableSchema schema = new TableSchema(); @@ -1486,6 +1493,22 @@ public void testWriteValidateFailsWithAvroFormatAndStreamingInserts() { .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); } + @Test + public void testWriteValidateFailsWithBatchAutoSharding() { + p.enableAbandonedNodeEnforcement(false); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Auto-sharding is only applicable to unbounded input."); + p.apply(Create.empty(INPUT_RECORD_CODER)) + .apply( + BigQueryIO.write() + .to("dataset.table") + .withSchema(new TableSchema()) + .withMethod(Method.STREAMING_INSERTS) + .withAutoSharding() + .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); + } + @Test public void testWritePartitionEmptyData() throws Exception { long numFiles = 0; From c033215c5cebc923652db018aaf034bcef8376c8 Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 27 Jan 2021 14:29:21 -0800 Subject: [PATCH 3/5] fix checkstyle error --- .../beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java index 19f4c33441e0..4fdadb16c9f6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java @@ -82,7 +82,7 @@ class BatchedStreamingWrite private transient Long lastReportedSystemClockMillis = System.currentTimeMillis(); - private final Logger LOG = LoggerFactory.getLogger(BatchedStreamingWrite.class); + private static final Logger LOG = LoggerFactory.getLogger(BatchedStreamingWrite.class); /** Tracks bytes written, exposed as "ByteCount" Counter. */ private Counter byteCounter = SinkMetrics.bytesWritten(); @@ -274,10 +274,11 @@ public void finishBundle(FinishBundleContext context) throws Exception { } } + // The max duration input records are allowed to be buffered in the state, if using ViaStateful. + private static final Duration BATCH_MAX_BUFFERING_DURATION = Duration.millis(200); + private class ViaStateful extends PTransform>>, PCollection> { - private final Duration BATCH_MAX_BUFFERING_DURATION = Duration.millis(200); - @Override public PCollection expand(PCollection>> input) { BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class); From 3deba1de69098cbf58753831a50bf9583a87af55 Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 27 Jan 2021 15:29:53 -0800 Subject: [PATCH 4/5] Revert the logic that was dropped during merge --- .../gcp/bigquery/BatchedStreamingWrite.java | 121 ++++++++---------- 1 file changed, 54 insertions(+), 67 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java index 4fdadb16c9f6..2bbfdc2b5125 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java @@ -20,16 +20,22 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import java.io.IOException; -import java.math.RoundingMode; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.beam.runners.core.metrics.MonitoringInfoConstants; +import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.metrics.MetricsLogger; import org.apache.beam.sdk.metrics.SinkMetrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupIntoBatches; @@ -41,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.Histogram; import org.apache.beam.sdk.util.ShardedKey; import org.apache.beam.sdk.values.FailsafeValueInSingleWindow; import org.apache.beam.sdk.values.KV; @@ -51,12 +56,11 @@ import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath; import org.joda.time.Duration; import org.joda.time.Instant; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** PTransform to perform batched streaming BigQuery write. */ @SuppressWarnings({ @@ -76,13 +80,7 @@ class BatchedStreamingWrite private final boolean ignoreInsertIds; private final SerializableFunction toTableRow; private final SerializableFunction toFailsafeTableRow; - - /** Tracks histogram of bytes written. Reset at the start of every bundle. */ - private transient Histogram histogram = Histogram.linear(0, 20, 3000); - - private transient Long lastReportedSystemClockMillis = System.currentTimeMillis(); - - private static final Logger LOG = LoggerFactory.getLogger(BatchedStreamingWrite.class); + private final Set metricFilter; /** Tracks bytes written, exposed as "ByteCount" Counter. */ private Counter byteCounter = SinkMetrics.bytesWritten(); @@ -111,6 +109,7 @@ public BatchedStreamingWrite( this.ignoreInsertIds = ignoreInsertIds; this.toTableRow = toTableRow; this.toFailsafeTableRow = toFailsafeTableRow; + this.metricFilter = getMetricFilter(); this.batchViaStateful = false; } @@ -136,9 +135,37 @@ private BatchedStreamingWrite( this.ignoreInsertIds = ignoreInsertIds; this.toTableRow = toTableRow; this.toFailsafeTableRow = toFailsafeTableRow; + this.metricFilter = getMetricFilter(); this.batchViaStateful = batchViaStateful; } + private static Set getMetricFilter() { + ImmutableSet.Builder setBuilder = ImmutableSet.builder(); + setBuilder.add( + MonitoringInfoMetricName.named( + MonitoringInfoConstants.Urns.API_REQUEST_LATENCIES, + BigQueryServicesImpl.API_METRIC_LABEL)); + for (String status : BigQueryServicesImpl.DatasetServiceImpl.CANONICAL_STATUS_MAP.values()) { + setBuilder.add( + MonitoringInfoMetricName.named( + MonitoringInfoConstants.Urns.API_REQUEST_COUNT, + ImmutableMap.builder() + .putAll(BigQueryServicesImpl.API_METRIC_LABEL) + .put(MonitoringInfoConstants.Labels.STATUS, status) + .build())); + } + setBuilder.add( + MonitoringInfoMetricName.named( + MonitoringInfoConstants.Urns.API_REQUEST_COUNT, + ImmutableMap.builder() + .putAll(BigQueryServicesImpl.API_METRIC_LABEL) + .put( + MonitoringInfoConstants.Labels.STATUS, + BigQueryServicesImpl.DatasetServiceImpl.CANONICAL_STATUS_UNKNOWN) + .build())); + return setBuilder.build(); + } + /** * A transform that performs batched streaming BigQuery write; input elements are batched and * flushed upon bundle finalization. @@ -208,21 +235,6 @@ private class BatchAndInsertElements extends DoFn> uniqueIdsForTableRows; - @Setup - public void setup() { - // record latency upto 60 seconds in the resolution of 20ms - histogram = Histogram.linear(0, 20, 3000); - lastReportedSystemClockMillis = System.currentTimeMillis(); - } - - @Teardown - public void teardown() { - if (histogram.getTotalCount() > 0) { - logPercentiles(); - histogram.clear(); - } - } - /** Prepares a target BigQuery table. */ @StartBundle public void startBundle() { @@ -269,8 +281,7 @@ public void finishBundle(FinishBundleContext context) throws Exception { for (ValueInSingleWindow row : failedInserts) { context.output(failedOutputTag, row.getValue(), row.getTimestamp(), row.getWindow()); } - - updateAndLogHistogram(options); + reportStreamingApiLogging(options); } } @@ -315,21 +326,6 @@ public PCollection expand(PCollection> private class InsertBatchedElements extends DoFn, Iterable>>, Void> { - @Setup - public void setup() { - // record latency up to 60 seconds in the resolution of 20ms - histogram = Histogram.linear(0, 20, 3000); - lastReportedSystemClockMillis = System.currentTimeMillis(); - } - - @Teardown - public void teardown() { - if (histogram.getTotalCount() > 0) { - logPercentiles(); - histogram.clear(); - } - } - @ProcessElement public void processElement( @Element KV, Iterable>> input, @@ -355,31 +351,10 @@ public void processElement( for (ValueInSingleWindow row : failedInserts) { out.get(failedOutputTag).output(row.getValue()); } - - updateAndLogHistogram(options); + reportStreamingApiLogging(options); } } - private void updateAndLogHistogram(BigQueryOptions options) { - long currentTimeMillis = System.currentTimeMillis(); - if (histogram.getTotalCount() > 0 - && (currentTimeMillis - lastReportedSystemClockMillis) - > options.getLatencyLoggingFrequency() * 1000L) { - logPercentiles(); - histogram.clear(); - lastReportedSystemClockMillis = currentTimeMillis; - } - } - - private void logPercentiles() { - LOG.info( - "Total number of streaming insert requests: {}, P99: {}ms, P90: {}ms, P50: {}ms", - histogram.getTotalCount(), - DoubleMath.roundToInt(histogram.p99(), RoundingMode.HALF_UP), - DoubleMath.roundToInt(histogram.p90(), RoundingMode.HALF_UP), - DoubleMath.roundToInt(histogram.p50(), RoundingMode.HALF_UP)); - } - /** Writes the accumulated rows into BigQuery with streaming API. */ private void flushRows( TableReference tableReference, @@ -392,7 +367,7 @@ private void flushRows( try { long totalBytes = bqServices - .getDatasetService(options, histogram) + .getDatasetService(options) .insertAll( tableReference, tableRows, @@ -409,4 +384,16 @@ private void flushRows( } } } + + private void reportStreamingApiLogging(BigQueryOptions options) { + MetricsContainer processWideContainer = MetricsEnvironment.getProcessWideContainer(); + if (processWideContainer instanceof MetricsLogger) { + MetricsLogger processWideMetricsLogger = (MetricsLogger) processWideContainer; + processWideMetricsLogger.tryLoggingMetrics( + "BigQuery HTTP API Metrics: \n", + metricFilter, + options.getBqStreamingApiLoggingFrequencySec() * 1000L, + true); + } + } } From 1b97e7c90a92f65df9faba5e531726ddc128c55a Mon Sep 17 00:00:00 2001 From: sychen Date: Wed, 27 Jan 2021 16:49:50 -0800 Subject: [PATCH 5/5] Add comments for RequiresStableInput --- .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 4 +++- .../beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java | 4 ++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index cf3555af169d..e2891762a9c6 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -599,7 +599,9 @@ private List getOverrides(boolean streaming) { BatchViewOverrides.BatchViewAsIterable.class, this))); } } - /* TODO[Beam-4684]: Support @RequiresStableInput on Dataflow in a more intelligent way + /* TODO(Beam-4684): Support @RequiresStableInput on Dataflow in a more intelligent way + Use Reshuffle might cause an extra and unnecessary shuffle to be inserted. To enable this, we + should make sure that we do not add extra shuffles for transforms whose input is already stable. // Uses Reshuffle, so has to be before the Reshuffle override overridesBuilder.add( PTransformOverride.of( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java index 2bbfdc2b5125..e1bf02fbdd65 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchedStreamingWrite.java @@ -324,6 +324,10 @@ public PCollection expand(PCollection> } } + // TODO(BEAM-11408): This transform requires stable inputs. Currently it relies on the fact that + // the upstream transform GroupIntoBatches produces stable outputs as opposed to using the + // annotation @RequiresStableInputs, to avoid potential performance penalty due to extra data + // shuffling. private class InsertBatchedElements extends DoFn, Iterable>>, Void> { @ProcessElement