diff --git a/CHANGES.md b/CHANGES.md index 75542d23bc48..758c11c83926 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -72,7 +72,7 @@ ## New Features / Improvements -* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Respect BatchSize and MaxBufferingDuration when using `JdbcIO.WriteWithResults`. Previously, these settings were ignored ([#35669](https://github.com/apache/beam/pull/35669)). ## Breaking Changes diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java index 2c0ad23c5638..cc5675521225 100644 --- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java +++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java @@ -1949,6 +1949,8 @@ public WriteWithResults withWriteResults( .setStatement(inner.getStatement()) .setTable(inner.getTable()) .setAutoSharding(inner.getAutoSharding()) + .setBatchSize(inner.getBatchSize()) + .setMaxBatchBufferingDuration(inner.getMaxBatchBufferingDuration()) .build(); } @@ -2055,6 +2057,10 @@ public abstract static class WriteWithResults abstract @Nullable RowMapper getRowMapper(); + abstract @Nullable Long getBatchSize(); + + abstract @Nullable Long getMaxBatchBufferingDuration(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -2064,6 +2070,10 @@ abstract Builder setDataSourceProviderFn( abstract Builder setAutoSharding(@Nullable Boolean autoSharding); + abstract Builder setBatchSize(@Nullable Long batchSize); + + abstract Builder setMaxBatchBufferingDuration(@Nullable Long maxBatchBufferingDuration); + abstract Builder setStatement(@Nullable ValueProvider statement); abstract Builder setPreparedStatementSetter( @@ -2080,6 +2090,19 @@ abstract Builder setPreparedStatementSetter( abstract WriteWithResults build(); } + public WriteWithResults withBatchSize(long batchSize) { + checkArgument(batchSize > 0, "batchSize must be > 0, but was %s", batchSize); + return toBuilder().setBatchSize(batchSize).build(); + } + + public WriteWithResults withMaxBatchBufferingDuration(long maxBatchBufferingDuration) { + checkArgument( + maxBatchBufferingDuration > 0, + "maxBatchBufferingDuration must be > 0, but was %s", + maxBatchBufferingDuration); + return toBuilder().setMaxBatchBufferingDuration(maxBatchBufferingDuration).build(); + } + public WriteWithResults withDataSourceConfiguration(DataSourceConfiguration config) { return withDataSourceProviderFn(new DataSourceProviderFromDataSourceConfiguration(config)); } @@ -2173,9 +2196,16 @@ public PCollection expand(PCollection input) { autoSharding == null || (autoSharding && input.isBounded() != IsBounded.UNBOUNDED), "Autosharding is only supported for streaming pipelines."); + Long batchSizeAsLong = getBatchSize(); + long batchSize = batchSizeAsLong == null ? DEFAULT_BATCH_SIZE : batchSizeAsLong; + Long maxBufferingDurationAsLong = getMaxBatchBufferingDuration(); + long maxBufferingDuration = + maxBufferingDurationAsLong == null + ? DEFAULT_MAX_BATCH_BUFFERING_DURATION + : maxBufferingDurationAsLong; + PCollection> iterables = - JdbcIO.batchElements( - input, autoSharding, DEFAULT_BATCH_SIZE, DEFAULT_MAX_BATCH_BUFFERING_DURATION); + JdbcIO.batchElements(input, autoSharding, batchSize, maxBufferingDuration); return iterables.apply( ParDo.of( new WriteFn( @@ -2187,8 +2217,8 @@ public PCollection expand(PCollection input) { .setStatement(getStatement()) .setRetryConfiguration(getRetryConfiguration()) .setReturnResults(true) - .setBatchSize(1L) - .setMaxBatchBufferingDuration(DEFAULT_MAX_BATCH_BUFFERING_DURATION) + .setBatchSize(1L) // We are writing iterables 1 at a time. + .setMaxBatchBufferingDuration(maxBufferingDuration) .build()))); } }