Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@

## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Respect BatchSize and MaxBufferingDuration when using `JdbcIO.WriteWithResults`. Previously, these settings were ignored ([#35669](https://github.com/apache/beam/pull/35669)).

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1949,6 +1949,8 @@ public <V extends JdbcWriteResult> WriteWithResults<T, V> withWriteResults(
.setStatement(inner.getStatement())
.setTable(inner.getTable())
.setAutoSharding(inner.getAutoSharding())
.setBatchSize(inner.getBatchSize())
.setMaxBatchBufferingDuration(inner.getMaxBatchBufferingDuration())
.build();
}

Expand Down Expand Up @@ -2055,6 +2057,10 @@ public abstract static class WriteWithResults<T, V extends JdbcWriteResult>

abstract @Nullable RowMapper<V> getRowMapper();

abstract @Nullable Long getBatchSize();

abstract @Nullable Long getMaxBatchBufferingDuration();

abstract Builder<T, V> toBuilder();

@AutoValue.Builder
Expand All @@ -2064,6 +2070,10 @@ abstract Builder<T, V> setDataSourceProviderFn(

abstract Builder<T, V> setAutoSharding(@Nullable Boolean autoSharding);

abstract Builder<T, V> setBatchSize(@Nullable Long batchSize);

abstract Builder<T, V> setMaxBatchBufferingDuration(@Nullable Long maxBatchBufferingDuration);

abstract Builder<T, V> setStatement(@Nullable ValueProvider<String> statement);

abstract Builder<T, V> setPreparedStatementSetter(
Expand All @@ -2080,6 +2090,19 @@ abstract Builder<T, V> setPreparedStatementSetter(
abstract WriteWithResults<T, V> build();
}

public WriteWithResults<T, V> withBatchSize(long batchSize) {
checkArgument(batchSize > 0, "batchSize must be > 0, but was %s", batchSize);
return toBuilder().setBatchSize(batchSize).build();
}

public WriteWithResults<T, V> withMaxBatchBufferingDuration(long maxBatchBufferingDuration) {
checkArgument(
maxBatchBufferingDuration > 0,
"maxBatchBufferingDuration must be > 0, but was %s",
maxBatchBufferingDuration);
return toBuilder().setMaxBatchBufferingDuration(maxBatchBufferingDuration).build();
}

public WriteWithResults<T, V> withDataSourceConfiguration(DataSourceConfiguration config) {
return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config));
}
Expand Down Expand Up @@ -2173,9 +2196,16 @@ public PCollection<V> expand(PCollection<T> input) {
autoSharding == null || (autoSharding && input.isBounded() != IsBounded.UNBOUNDED),
"Autosharding is only supported for streaming pipelines.");

Long batchSizeAsLong = getBatchSize();
long batchSize = batchSizeAsLong == null ? DEFAULT_BATCH_SIZE : batchSizeAsLong;
Long maxBufferingDurationAsLong = getMaxBatchBufferingDuration();
long maxBufferingDuration =
maxBufferingDurationAsLong == null
? DEFAULT_MAX_BATCH_BUFFERING_DURATION
: maxBufferingDurationAsLong;

PCollection<Iterable<T>> iterables =
JdbcIO.<T>batchElements(
input, autoSharding, DEFAULT_BATCH_SIZE, DEFAULT_MAX_BATCH_BUFFERING_DURATION);
JdbcIO.<T>batchElements(input, autoSharding, batchSize, maxBufferingDuration);
return iterables.apply(
ParDo.of(
new WriteFn<T, V>(
Expand All @@ -2187,8 +2217,8 @@ public PCollection<V> expand(PCollection<T> input) {
.setStatement(getStatement())
.setRetryConfiguration(getRetryConfiguration())
.setReturnResults(true)
.setBatchSize(1L)
.setMaxBatchBufferingDuration(DEFAULT_MAX_BATCH_BUFFERING_DURATION)
.setBatchSize(1L) // We are writing iterables 1 at a time.
.setMaxBatchBufferingDuration(maxBufferingDuration)
.build())));
}
}
Expand Down
Loading