Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,9 @@ private List<PTransformOverride> getOverrides(boolean streaming) {
BatchViewOverrides.BatchViewAsIterable.class, this)));
}
}
/* TODO[Beam-4684]: Support @RequiresStableInput on Dataflow in a more intelligent way
/* TODO(Beam-4684): Support @RequiresStableInput on Dataflow in a more intelligent way
Use Reshuffle might cause an extra and unnecessary shuffle to be inserted. To enable this, we
should make sure that we do not add extra shuffles for transforms whose input is already stable.
// Uses Reshuffle, so has to be before the Reshuffle override
overridesBuilder.add(
PTransformOverride.of(
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,7 @@ public static <T> Write<T> write() {
.setMaxBytesPerPartition(BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION)
.setOptimizeWrites(false)
.setUseBeamSchema(false)
.setAutoSharding(false)
.build();
}

Expand Down Expand Up @@ -1789,6 +1790,9 @@ public enum Method {
@Experimental(Kind.SCHEMAS)
abstract Boolean getUseBeamSchema();

@Experimental
abstract Boolean getAutoSharding();

abstract Builder<T> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -1868,6 +1872,9 @@ abstract Builder<T> setAvroSchemaFactory(
@Experimental(Kind.SCHEMAS)
abstract Builder<T> setUseBeamSchema(Boolean useBeamSchema);

@Experimental
abstract Builder<T> setAutoSharding(Boolean autoSharding);

abstract Write<T> build();
}

Expand Down Expand Up @@ -2342,6 +2349,17 @@ public Write<T> useBeamSchema() {
return toBuilder().setUseBeamSchema(true).build();
}

/**
* If true, enables dynamically determined number of shards to write to BigQuery. Only
* applicable to unbounded data with STREAMING_INSERTS.
*
* <p>TODO(BEAM-11408): Also integrate this option to FILE_LOADS.
*/
@Experimental
public Write<T> withAutoSharding() {
return toBuilder().setAutoSharding(true).build();
}

@VisibleForTesting
/** This method is for test usage only */
public Write<T> withTestServices(BigQueryServices testServices) {
Expand Down Expand Up @@ -2487,6 +2505,10 @@ public WriteResult expand(PCollection<T> input) {
method);
}

if (input.isBounded() == IsBounded.BOUNDED) {
checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input.");
}

if (getJsonTimePartitioning() != null) {
checkArgument(
getDynamicDestinations() == null,
Expand Down Expand Up @@ -2681,6 +2703,7 @@ private <DestinationT, ElementT> WriteResult continueExpandTyped(
.withSkipInvalidRows(getSkipInvalidRows())
.withIgnoreUnknownValues(getIgnoreUnknownValues())
.withIgnoreInsertIds(getIgnoreInsertIds())
.withAutoSharding(getAutoSharding())
.withKmsKey(getKmsKey());
return input.apply(streamingInserts);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class StreamingInserts<DestinationT, ElementT>
private final boolean skipInvalidRows;
private final boolean ignoreUnknownValues;
private final boolean ignoreInsertIds;
private final boolean autoSharding;
private final String kmsKey;
private final Coder<ElementT> elementCoder;
private final SerializableFunction<ElementT, TableRow> toTableRow;
Expand All @@ -63,6 +64,7 @@ public StreamingInserts(
false,
false,
false,
false,
elementCoder,
toTableRow,
toFailsafeTableRow,
Expand All @@ -79,6 +81,7 @@ private StreamingInserts(
boolean skipInvalidRows,
boolean ignoreUnknownValues,
boolean ignoreInsertIds,
boolean autoSharding,
Coder<ElementT> elementCoder,
SerializableFunction<ElementT, TableRow> toTableRow,
SerializableFunction<ElementT, TableRow> toFailsafeTableRow,
Expand All @@ -91,6 +94,7 @@ private StreamingInserts(
this.skipInvalidRows = skipInvalidRows;
this.ignoreUnknownValues = ignoreUnknownValues;
this.ignoreInsertIds = ignoreInsertIds;
this.autoSharding = autoSharding;
this.elementCoder = elementCoder;
this.toTableRow = toTableRow;
this.toFailsafeTableRow = toFailsafeTableRow;
Expand All @@ -109,6 +113,7 @@ public StreamingInserts<DestinationT, ElementT> withInsertRetryPolicy(
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
elementCoder,
toTableRow,
toFailsafeTableRow,
Expand All @@ -126,6 +131,7 @@ public StreamingInserts<DestinationT, ElementT> withExtendedErrorInfo(boolean ex
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
elementCoder,
toTableRow,
toFailsafeTableRow,
Expand All @@ -142,6 +148,7 @@ StreamingInserts<DestinationT, ElementT> withSkipInvalidRows(boolean skipInvalid
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
elementCoder,
toTableRow,
toFailsafeTableRow,
Expand All @@ -158,6 +165,7 @@ StreamingInserts<DestinationT, ElementT> withIgnoreUnknownValues(boolean ignoreU
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
elementCoder,
toTableRow,
toFailsafeTableRow,
Expand All @@ -174,6 +182,24 @@ StreamingInserts<DestinationT, ElementT> withIgnoreInsertIds(boolean ignoreInser
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
elementCoder,
toTableRow,
toFailsafeTableRow,
kmsKey);
}

StreamingInserts<DestinationT, ElementT> withAutoSharding(boolean autoSharding) {
return new StreamingInserts<>(
createDisposition,
dynamicDestinations,
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
elementCoder,
toTableRow,
toFailsafeTableRow,
Expand All @@ -190,6 +216,7 @@ StreamingInserts<DestinationT, ElementT> withKmsKey(String kmsKey) {
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
elementCoder,
toTableRow,
toFailsafeTableRow,
Expand All @@ -206,6 +233,7 @@ StreamingInserts<DestinationT, ElementT> withTestServices(BigQueryServices bigQu
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
elementCoder,
toTableRow,
toFailsafeTableRow,
Expand All @@ -229,6 +257,7 @@ public WriteResult expand(PCollection<KV<DestinationT, ElementT>> input) {
.withSkipInvalidRows(skipInvalidRows)
.withIgnoreUnknownValues(ignoreUnknownValues)
.withIgnoreInsertIds(ignoreInsertIds)
.withAutoSharding(autoSharding)
.withElementCoder(elementCoder)
.withToTableRow(toTableRow)
.withToFailsafeTableRow(toFailsafeTableRow));
Expand Down
Loading