From 3d2154f81f163691795313cb6331468c74ba9600 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Tue, 16 Apr 2024 18:39:22 +0000 Subject: [PATCH 1/8] [#30941]fix upgrade test deu to missed config ConsumerPollingTimeout in KafkaIOTranslation --- .../org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 4cbed36c697f..1376ef46a314 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -102,6 +102,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl .addNullableByteArrayField("key_deserializer_provider") .addNullableByteArrayField("value_deserializer_provider") .addNullableByteArrayField("check_stop_reading_fn") + .addNullableInt64Field("consumer_polling_timeout") .build(); @Override From 7907c781b10d16dbef0e34b8a7ffccf302ab7795 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Tue, 16 Apr 2024 19:25:07 +0000 Subject: [PATCH 2/8] [#30941]fix upgrade test due to missed config ConsumerPollingTimeout in KafkaIOTranslation --- .../apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 1376ef46a314..a43c5a4a36f3 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -102,7 +102,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl .addNullableByteArrayField("key_deserializer_provider") .addNullableByteArrayField("value_deserializer_provider") .addNullableByteArrayField("check_stop_reading_fn") - .addNullableInt64Field("consumer_polling_timeout") + .addNullableLogicalTypeField("consumer_polling_timeout", new NanosDuration()) .build(); @Override From 95e46c0731543a64652db57f6489c8dee3c33c6e Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Tue, 16 Apr 2024 20:18:43 +0000 Subject: [PATCH 3/8] [#30941]fix upgrade test due to missed config ConsumerPollingTimeout in KafkaIOTranslation --- .../beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index a43c5a4a36f3..a3f248cce089 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -174,6 +174,9 @@ public Row toConfigRow(Read transform) { if (transform.getStopReadTime() != null) { fieldValues.put("stop_read_time", transform.getStopReadTime()); } + if (transform.getConsumerPollingTimeout() != null) { + fieldValues.put("consumer_polling_timeout", transform.getConsumerPollingTimeout()); + } fieldValues.put( "is_commit_offset_finalize_enabled", transform.isCommitOffsetsInFinalizeEnabled()); @@ -321,6 +324,12 @@ public Row toConfigRow(Read transform) { transform = transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis())); } + Duration consumerPollingTimeout = configRow.getValue("consumer_polling_timeout"); + if (consumerPollingTimeout != null) { + transform = + transform.withConsumerPollingTimeout( + org.joda.time.Duration.millis(consumerPollingTimeout.toMillis())); + } Instant startReadTime = configRow.getValue("start_read_time"); if (startReadTime != null) { transform = transform.withStartReadTime(startReadTime); From d2dfdc3cb0dd76cb8563d12aefe8771ca782b881 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Tue, 16 Apr 2024 20:45:50 +0000 Subject: [PATCH 4/8] [#30941]fix upgrade test due to missed config ConsumerPollingTimeout in KafkaIOTranslation --- .../sdk/io/kafka/upgrade/KafkaIOTranslation.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index a3f248cce089..7fe615486171 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -324,11 +324,16 @@ public Row toConfigRow(Read transform) { transform = transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis())); } - Duration consumerPollingTimeout = configRow.getValue("consumer_polling_timeout"); - if (consumerPollingTimeout != null) { + if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") < 0) { transform = - transform.withConsumerPollingTimeout( - org.joda.time.Duration.millis(consumerPollingTimeout.toMillis())); + transform.withConsumerPollingTimeout(Duration.standardSeconds(2L)); // Current default. + } else { + Duration consumerPollingTimeout = configRow.getValue("consumer_polling_timeout"); + if (consumerPollingTimeout != null) { + transform = + transform.withConsumerPollingTimeout( + org.joda.time.Duration.millis(consumerPollingTimeout.toMillis())); + } } Instant startReadTime = configRow.getValue("start_read_time"); if (startReadTime != null) { From adbd83eefef9da4d064e5af255c3594ea6737347 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Tue, 16 Apr 2024 21:01:31 +0000 Subject: [PATCH 5/8] [#30941]fix upgrade test due to missed config ConsumerPollingTimeout in KafkaIOTranslation --- .../sdk/io/kafka/upgrade/KafkaIOTranslation.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 7fe615486171..8fb355c3d138 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIOUtils; import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -51,6 +52,7 @@ import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.util.construction.TransformUpgrader; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -221,6 +223,13 @@ public Row toConfigRow(Read transform) { @Override public Read fromConfigRow(Row configRow, PipelineOptions options) { + String updateCompatibilityBeamVersion = + options.as(StreamingOptions.class).getUpdateCompatibilityVersion(); + // We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption + // is not correctly passed in for pipelines that use Beam 2.55.0. + // This is fixed for Beam 2.56.0 and later. + updateCompatibilityBeamVersion = + (updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.55.0"; try { Read transform = KafkaIO.read(); @@ -325,8 +334,9 @@ public Row toConfigRow(Read transform) { transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis())); } if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") < 0) { + // set to current default transform = - transform.withConsumerPollingTimeout(Duration.standardSeconds(2L)); // Current default. + transform.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(2L)); } else { Duration consumerPollingTimeout = configRow.getValue("consumer_polling_timeout"); if (consumerPollingTimeout != null) { From 3c561104a969f466cc2c72a28d596d1e2995257a Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Wed, 17 Apr 2024 00:52:55 +0000 Subject: [PATCH 6/8] fixed upgrade test and changed consumer timeout to long --- .../org/apache/beam/sdk/io/kafka/KafkaIO.java | 38 +++++++++---------- .../beam/sdk/io/kafka/ReadFromKafkaDoFn.java | 21 +++++----- .../apache/beam/sdk/io/kafka/KafkaIOTest.java | 6 +-- .../sdk/io/kafka/ReadFromKafkaDoFnTest.java | 7 ++-- .../io/kafka/upgrade/KafkaIOTranslation.java | 14 +++---- sdks/python/apache_beam/io/kafka.py | 2 +- 6 files changed, 38 insertions(+), 50 deletions(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 6a0df5897063..e897ed439cd1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -587,7 +587,7 @@ public static Read read() { .setCommitOffsetsInFinalizeEnabled(false) .setDynamicRead(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) - .setConsumerPollingTimeout(Duration.standardSeconds(2L)) + .setConsumerPollingTimeout(2L) .build(); } @@ -708,7 +708,7 @@ public abstract static class Read public abstract @Nullable ErrorHandler getBadRecordErrorHandler(); @Pure - public abstract @Nullable Duration getConsumerPollingTimeout(); + public abstract long getConsumerPollingTimeout(); abstract Builder toBuilder(); @@ -766,7 +766,7 @@ Builder setCheckStopReadingFn( return setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn)); } - abstract Builder setConsumerPollingTimeout(Duration consumerPollingTimeout); + abstract Builder setConsumerPollingTimeout(long consumerPollingTimeout); abstract Read build(); @@ -836,10 +836,9 @@ static void setupExternalBuilder( if (config.consumerPollingTimeout <= 0) { throw new IllegalArgumentException("consumerPollingTimeout should be > 0."); } - builder.setConsumerPollingTimeout( - Duration.standardSeconds(config.consumerPollingTimeout)); + builder.setConsumerPollingTimeout(config.consumerPollingTimeout); } else { - builder.setConsumerPollingTimeout(Duration.standardSeconds(2L)); + builder.setConsumerPollingTimeout(2L); } } @@ -1356,14 +1355,12 @@ public Read withBadRecordErrorHandler(ErrorHandler badRecord } /** - * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A - * lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching - * enough (or any) records. The default is 2 seconds. + * Sets the timeout time in seconds for Kafka consumer polling request in the {@link + * ReadFromKafkaDoFn}. A lower timeout optimizes for latency. Increase the timeout if the + * consumer is not fetching any records. The default is 2 seconds. */ - public Read withConsumerPollingTimeout(Duration duration) { - checkState( - duration == null || duration.compareTo(Duration.ZERO) > 0, - "Consumer polling timeout must be greater than 0."); + public Read withConsumerPollingTimeout(long duration) { + checkState(duration > 0, "Consumer polling timeout must be greater than 0."); return toBuilder().setConsumerPollingTimeout(duration).build(); } @@ -2071,7 +2068,7 @@ public abstract static class ReadSourceDescriptors abstract ErrorHandler getBadRecordErrorHandler(); @Pure - abstract @Nullable Duration getConsumerPollingTimeout(); + abstract long getConsumerPollingTimeout(); abstract boolean isBounded(); @@ -2123,8 +2120,7 @@ abstract ReadSourceDescriptors.Builder setBadRecordRouter( abstract ReadSourceDescriptors.Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); - abstract ReadSourceDescriptors.Builder setConsumerPollingTimeout( - @Nullable Duration duration); + abstract ReadSourceDescriptors.Builder setConsumerPollingTimeout(long duration); abstract ReadSourceDescriptors.Builder setBounded(boolean bounded); @@ -2139,7 +2135,7 @@ public static ReadSourceDescriptors read() { .setBounded(false) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()) - .setConsumerPollingTimeout(Duration.standardSeconds(2L)) + .setConsumerPollingTimeout(2L) .build() .withProcessingTime() .withMonotonicallyIncreasingWatermarkEstimator(); @@ -2402,11 +2398,11 @@ public ReadSourceDescriptors withBadRecordErrorHandler( } /** - * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A - * lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching - * enough (or any) records. The default is 2 seconds. + * Sets the timeout time in seconds for Kafka consumer polling request in the {@link + * ReadFromKafkaDoFn}. A lower timeout optimizes for latency. Increase the timeout if the + * consumer is not fetching any records. The default is 2 seconds. */ - public ReadSourceDescriptors withConsumerPollingTimeout(@Nullable Duration duration) { + public ReadSourceDescriptors withConsumerPollingTimeout(long duration) { return toBuilder().setConsumerPollingTimeout(duration).build(); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 3a821ef9519e..b7bbc904da81 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -191,11 +191,10 @@ private ReadFromKafkaDoFn( this.checkStopReadingFn = transform.getCheckStopReadingFn(); this.badRecordRouter = transform.getBadRecordRouter(); this.recordTag = recordTag; - if (transform.getConsumerPollingTimeout() != null) { - this.consumerPollingTimeout = - java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis()); + if (transform.getConsumerPollingTimeout() > 0) { + this.consumerPollingTimeout = transform.getConsumerPollingTimeout(); } else { - this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT; + this.consumerPollingTimeout = DEFAULT_KAFKA_POLL_TIMEOUT; } } @@ -222,10 +221,8 @@ private ReadFromKafkaDoFn( private transient @Nullable Map offsetEstimatorCache; private transient @Nullable LoadingCache avgRecordSize; - - private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(2); - - @VisibleForTesting final java.time.Duration consumerPollingTimeout; + private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; + @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @VisibleForTesting final Map consumerConfig; @@ -513,9 +510,9 @@ private ConsumerRecords poll( final Stopwatch sw = Stopwatch.createStarted(); long previousPosition = -1; java.time.Duration elapsed = java.time.Duration.ZERO; + java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout); while (true) { - final ConsumerRecords rawRecords = - consumer.poll(consumerPollingTimeout.minus(elapsed)); + final ConsumerRecords rawRecords = consumer.poll(timeout.minus(elapsed)); if (!rawRecords.isEmpty()) { // return as we have found some entries return rawRecords; @@ -525,11 +522,11 @@ private ConsumerRecords poll( return rawRecords; } elapsed = sw.elapsed(); - if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) { + if (elapsed.toMillis() >= timeout.toMillis()) { // timeout is over LOG.warn( "No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", - consumerPollingTimeout.getSeconds()); + consumerPollingTimeout); return rawRecords; } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 44c028f08a27..07e5b519c013 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -2123,14 +2123,14 @@ public void testSinkMetrics() throws Exception { @Test(expected = IllegalStateException.class) public void testWithInvalidConsumerPollingTimeout() { - KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(-5)); + KafkaIO.read().withConsumerPollingTimeout(-5L); } @Test public void testWithValidConsumerPollingTimeout() { KafkaIO.Read reader = - KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(15)); - assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds()); + KafkaIO.read().withConsumerPollingTimeout(15L); + assertEquals(15, reader.getConsumerPollingTimeout()); } private static void verifyProducerRecords( diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 8902f22164bc..b8ff08485c3b 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -646,13 +646,12 @@ public void testConstructorWithPollTimeout() { ReadSourceDescriptors descriptors = makeReadSourceDescriptor(consumer); // default poll timeout = 1 scond ReadFromKafkaDoFn dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(Duration.ofSeconds(2L), dofnInstance.consumerPollingTimeout); + Assert.assertEquals(2L, dofnInstance.consumerPollingTimeout); // updated timeout = 5 seconds - descriptors = - descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L)); + descriptors = descriptors.withConsumerPollingTimeout(5L); ReadFromKafkaDoFn dofnInstanceNew = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(Duration.ofSeconds(5L), dofnInstanceNew.consumerPollingTimeout); + Assert.assertEquals(5L, dofnInstanceNew.consumerPollingTimeout); } private BoundednessVisitor testBoundedness( diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 8fb355c3d138..22dea97305a2 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -104,7 +104,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl .addNullableByteArrayField("key_deserializer_provider") .addNullableByteArrayField("value_deserializer_provider") .addNullableByteArrayField("check_stop_reading_fn") - .addNullableLogicalTypeField("consumer_polling_timeout", new NanosDuration()) + .addNullableInt64Field("consumer_polling_timeout") .build(); @Override @@ -176,10 +176,7 @@ public Row toConfigRow(Read transform) { if (transform.getStopReadTime() != null) { fieldValues.put("stop_read_time", transform.getStopReadTime()); } - if (transform.getConsumerPollingTimeout() != null) { - fieldValues.put("consumer_polling_timeout", transform.getConsumerPollingTimeout()); - } - + fieldValues.put("consumer_polling_timeout", transform.getConsumerPollingTimeout()); fieldValues.put( "is_commit_offset_finalize_enabled", transform.isCommitOffsetsInFinalizeEnabled()); fieldValues.put("is_dynamic_read", transform.isDynamicRead()); @@ -336,13 +333,12 @@ public Row toConfigRow(Read transform) { if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") < 0) { // set to current default transform = - transform.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(2L)); + transform.withConsumerPollingTimeout(2L); } else { - Duration consumerPollingTimeout = configRow.getValue("consumer_polling_timeout"); + Long consumerPollingTimeout = configRow.getInt64("consumer_polling_timeout"); if (consumerPollingTimeout != null) { transform = - transform.withConsumerPollingTimeout( - org.joda.time.Duration.millis(consumerPollingTimeout.toMillis())); + transform.withConsumerPollingTimeout(consumerPollingTimeout); } } Instant startReadTime = configRow.getValue("start_read_time"); diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index e1aeab8d3a8c..b4fd7d86e688 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -135,7 +135,7 @@ def __init__( max_read_time=None, commit_offset_in_finalize=False, timestamp_policy=processing_time_policy, - consumer_polling_timeout=None, + consumer_polling_timeout=2, with_metadata=False, expansion_service=None, ): From c8a5e8f4af21956d236171621513e9d72c866373 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Wed, 17 Apr 2024 01:02:03 +0000 Subject: [PATCH 7/8] fixed spotless issues --- .../beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 22dea97305a2..9848e429e215 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -332,13 +332,11 @@ public Row toConfigRow(Read transform) { } if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") < 0) { // set to current default - transform = - transform.withConsumerPollingTimeout(2L); + transform = transform.withConsumerPollingTimeout(2L); } else { Long consumerPollingTimeout = configRow.getInt64("consumer_polling_timeout"); if (consumerPollingTimeout != null) { - transform = - transform.withConsumerPollingTimeout(consumerPollingTimeout); + transform = transform.withConsumerPollingTimeout(consumerPollingTimeout); } } Instant startReadTime = configRow.getValue("start_read_time"); From 1cabedfe05b4283fbe7d1fc65f7e5ec4c812d719 Mon Sep 17 00:00:00 2001 From: Xianhua Liu Date: Wed, 17 Apr 2024 02:34:23 +0000 Subject: [PATCH 8/8] fixed test --- .../io/kafka/KafkaIOReadImplementationCompatibility.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index 7e54407300d4..89e2b80a8c6a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -112,7 +112,12 @@ Object getDefaultValue() { VALUE_DESERIALIZER_PROVIDER, CHECK_STOP_READING_FN(SDF), BAD_RECORD_ERROR_HANDLER(SDF), - CONSUMER_POLLING_TIMEOUT, + CONSUMER_POLLING_TIMEOUT(SDF) { + @Override + Object getDefaultValue() { + return Long.valueOf(2); + } + }, ; @Nonnull private final ImmutableSet supportedImplementations;