Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.GroupIntoBatches;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
Expand Down Expand Up @@ -265,31 +268,44 @@ public void validate(PipelineOptions options) {

// Expand the pipeline when the user has requested periodically-triggered file writes.
private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {
checkArgument(numFileShards > 0);
Pipeline p = input.getPipeline();
final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(p, loadJobIdPrefixView);
// The user-supplied triggeringDuration is often chosen to control how many BigQuery load
// jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
// is set to a large value, currently we have to buffer all the data until the trigger fires.
// Instead we ensure that the files are written if a threshold number of records are ready.
// We use only the user-supplied trigger on the actual BigQuery load. This allows us to
// offload the data to the filesystem.
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency),
AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
.discardingFiredPanes());
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
PCollection<WriteBundlesToFiles.Result<DestinationT>> results;
if (numFileShards > 0) {
// The user-supplied triggeringFrequency is often chosen to control how many BigQuery load
// jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
// is set to a large value, currently we have to buffer all the data until the trigger fires.
// Instead we ensure that the files are written if a threshold number of records are ready.
// We use only the user-supplied trigger on the actual BigQuery load. This allows us to
// offload the data to the filesystem.
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency),
AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
.discardingFiredPanes());
results = writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView);
} else {
// In the case of dynamic sharding, however, we use a default triggering and instead apply the
// user supplied triggeringFrequency to the sharding transform. See
// writeDynamicallyShardedFilesTriggered.
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes());
results = writeDynamicallyShardedFilesTriggered(inputInGlobalWindow, tempFilePrefixView);
}

// Apply the user's trigger before we start generating BigQuery load jobs.
results =
results.apply(
Expand All @@ -307,7 +323,7 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
new TupleTag<>("singlePartitionTag");

// If we have non-default triggered output, we can't use the side-input technique used in
// expandUntriggered . Instead make the result list a main input. Apply a GroupByKey first for
// expandUntriggered. Instead make the result list a main input. Apply a GroupByKey first for
// determinism.
PCollectionTuple partitions =
results
Expand Down Expand Up @@ -371,8 +387,8 @@ public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> inp
.discardingFiredPanes());
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
(numFileShards == 0)
? writeDynamicallyShardedFiles(inputInGlobalWindow, tempFilePrefixView)
: writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
? writeDynamicallyShardedFilesUntriggered(inputInGlobalWindow, tempFilePrefixView)
: writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView);

TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
Expand Down Expand Up @@ -470,9 +486,10 @@ public void getTempFilePrefix(ProcessContext c) {
.apply("TempFilePrefixView", View.asSingleton());
}

// Writes input data to dynamically-sharded, per-bundle files. Returns a PCollection of filename,
// file byte size, and table destination.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFiles(
// Writes input data to dynamically-sharded per-bundle files without triggering. Input records are
// spilt to new files if memory is constrained. Returns a PCollection of filename, file byte size,
// and table destination.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFilesUntriggered(
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag =
new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles") {};
Expand Down Expand Up @@ -513,9 +530,9 @@ PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFil
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}

// Writes input data to statically-sharded files. Returns a PCollection of filename,
// file byte size, and table destination.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles(
// Writes input data to statically-sharded files. Returns a PCollection of filename, file byte
// size, and table destination.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeStaticallyShardedFiles(
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
checkState(numFileShards > 0);
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords =
Expand Down Expand Up @@ -547,11 +564,66 @@ public void processElement(
return writeShardedRecords(shardedRecords, tempFilePrefix);
}

// Writes input data to dynamically-sharded files with triggering. The input data is sharded by
// table destinations and each destination may be sub-sharded dynamically. Returns a PCollection
// of filename, file byte size, and table destination.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFilesTriggered(
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
// In contrast to fixed sharding with triggering, here we use a global window with default
// trigger and apply the user supplied triggeringFrequency in the subsequent GroupIntoBatches
// transform. We also ensure that the files are written if a threshold number of records are
// ready. Dynamic sharding is achieved via the withShardedKey() option provided by
// GroupIntoBatches.
return input
.apply(
GroupIntoBatches.<DestinationT, ElementT>ofSize(FILE_TRIGGERING_RECORD_COUNT)
.withMaxBufferingDuration(triggeringFrequency)
.withShardedKey())
.setCoder(
KvCoder.of(
org.apache.beam.sdk.util.ShardedKey.Coder.of(destinationCoder),
IterableCoder.of(elementCoder)))
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<org.apache.beam.sdk.util.ShardedKey<DestinationT>, Iterable<ElementT>>
input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
tempFilePrefix, maxFileSize, rowWriterFactory))
.withSideInputs(tempFilePrefix))
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}

private PCollection<Result<DestinationT>> writeShardedRecords(
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
PCollectionView<String> tempFilePrefix) {
return shardedRecords
.apply("GroupByDestination", GroupByKey.create())
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
KV<DestinationT, Iterable<ElementT>>>() {
@Override
public KV<DestinationT, Iterable<ElementT>> apply(
KV<ShardedKey<DestinationT>, Iterable<ElementT>> input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(destinationCoder, IterableCoder.of(elementCoder)))
.apply(
"WriteGroupedRecords",
ParDo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2267,7 +2267,8 @@ public Write<T> withTriggeringFrequency(Duration triggeringFrequency) {

/**
* Control how many file shards are written when using BigQuery load jobs. Applicable only when
* also setting {@link #withTriggeringFrequency}.
* also setting {@link #withTriggeringFrequency}. To let runner determine the sharding at
* runtime, set {@link #withAutoSharding()} instead.
*/
public Write<T> withNumFileShards(int numFileShards) {
checkArgument(numFileShards > 0, "numFileShards must be > 0, but was: %s", numFileShards);
Expand Down Expand Up @@ -2350,10 +2351,10 @@ public Write<T> useBeamSchema() {
}

/**
* If true, enables dynamically determined number of shards to write to BigQuery. Only
* applicable to unbounded data with STREAMING_INSERTS.
*
* <p>TODO(BEAM-11408): Also integrate this option to FILE_LOADS.
* If true, enables using a dynamically determined number of shards to write to BigQuery. This
* can be used for both {@link Method#FILE_LOADS} and {@link Method#STREAMING_INSERTS}. Only
* applicable to unbounded data. If using {@link Method#FILE_LOADS}, numFileShards set via
* {@link #withNumFileShards} will be ignored.
*/
@Experimental
public Write<T> withAutoSharding() {
Expand Down Expand Up @@ -2751,7 +2752,11 @@ private <DestinationT, ElementT> WriteResult continueExpandTyped(
batchLoads.setMaxRetryJobs(1000);
}
batchLoads.setTriggeringFrequency(getTriggeringFrequency());
batchLoads.setNumFileShards(getNumFileShards());
if (getAutoSharding()) {
batchLoads.setNumFileShards(0);
} else {
batchLoads.setNumFileShards(getNumFileShards());
}
return input.apply(batchLoads);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ShardedKey;

/**
* Receives elements grouped by their (sharded) destination, and writes them out to a file. Since
* all the elements in the {@link Iterable} are destined to the same table, they are all written to
* the same file. Ensures that only one {@link TableRowWriter} is active per bundle.
* Receives elements grouped by their destination, and writes them out to a file. Since all the
* elements in the {@link Iterable} are destined to the same table, they are all written to the same
* file. Ensures that only one {@link TableRowWriter} is active per bundle.
*/
class WriteGroupedRecordsToFiles<DestinationT, ElementT>
extends DoFn<
KV<ShardedKey<DestinationT>, Iterable<ElementT>>,
WriteBundlesToFiles.Result<DestinationT>> {
extends DoFn<KV<DestinationT, Iterable<ElementT>>, WriteBundlesToFiles.Result<DestinationT>> {

private final PCollectionView<String> tempFilePrefix;
private final long maxFileSize;
Expand All @@ -48,24 +45,24 @@ class WriteGroupedRecordsToFiles<DestinationT, ElementT>
@ProcessElement
public void processElement(
ProcessContext c,
@Element KV<ShardedKey<DestinationT>, Iterable<ElementT>> element,
@Element KV<DestinationT, Iterable<ElementT>> element,
OutputReceiver<WriteBundlesToFiles.Result<DestinationT>> o)
throws Exception {

String tempFilePrefix = c.sideInput(this.tempFilePrefix);

BigQueryRowWriter<ElementT> writer =
rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey().getKey());
rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey());

try {
for (ElementT tableRow : element.getValue()) {
if (writer.getByteSize() > maxFileSize) {
writer.close();
writer = rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey().getKey());
writer = rowWriterFactory.createRowWriter(tempFilePrefix, element.getKey());
BigQueryRowWriter.Result result = writer.getResult();
o.output(
new WriteBundlesToFiles.Result<>(
result.resourceId.toString(), result.byteSize, c.element().getKey().getKey()));
result.resourceId.toString(), result.byteSize, c.element().getKey()));
}
writer.write(tableRow);
}
Expand All @@ -76,6 +73,6 @@ public void processElement(
BigQueryRowWriter.Result result = writer.getResult();
o.output(
new WriteBundlesToFiles.Result<>(
result.resourceId.toString(), result.byteSize, c.element().getKey().getKey()));
result.resourceId.toString(), result.byteSize, c.element().getKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
Expand Down Expand Up @@ -62,8 +63,12 @@ public class FakeDatasetService implements DatasetService, Serializable {

Map<String, List<String>> insertErrors = Maps.newHashMap();

// The counter for the number of insertions performed.
static AtomicInteger insertCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this static? shouldn't it belong to the instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the instance can be reconstructed for multiple times. If not using a static variable, the counter will be zero at the verification time. There is explanation for the static table map above:
...each ParDo will get a separate instance of FakeDatasetServices...

static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table<

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense.


public static void setUp() {
tables = HashBasedTable.create();
insertCount = new AtomicInteger(0);
FakeJobService.setUp();
}

Expand Down Expand Up @@ -217,6 +222,10 @@ public void deleteDataset(String projectId, String datasetId)
}
}

public int getInsertCount() {
return insertCount.get();
}

public long insertAll(
TableReference ref, List<TableRow> rowList, @Nullable List<String> insertIdList)
throws IOException, InterruptedException {
Expand Down Expand Up @@ -292,6 +301,7 @@ public <T> long insertAll(
failedInserts, allErrors.get(allErrors.size() - 1), ref, rowList.get(i));
}
}
insertCount.addAndGet(1);
return dataSize;
}
}
Expand Down
Loading