diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 6a0df5897063..e897ed439cd1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -587,7 +587,7 @@ public static Read read() { .setCommitOffsetsInFinalizeEnabled(false) .setDynamicRead(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) - .setConsumerPollingTimeout(Duration.standardSeconds(2L)) + .setConsumerPollingTimeout(2L) .build(); } @@ -708,7 +708,7 @@ public abstract static class Read public abstract @Nullable ErrorHandler getBadRecordErrorHandler(); @Pure - public abstract @Nullable Duration getConsumerPollingTimeout(); + public abstract long getConsumerPollingTimeout(); abstract Builder toBuilder(); @@ -766,7 +766,7 @@ Builder setCheckStopReadingFn( return setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn)); } - abstract Builder setConsumerPollingTimeout(Duration consumerPollingTimeout); + abstract Builder setConsumerPollingTimeout(long consumerPollingTimeout); abstract Read build(); @@ -836,10 +836,9 @@ static void setupExternalBuilder( if (config.consumerPollingTimeout <= 0) { throw new IllegalArgumentException("consumerPollingTimeout should be > 0."); } - builder.setConsumerPollingTimeout( - Duration.standardSeconds(config.consumerPollingTimeout)); + builder.setConsumerPollingTimeout(config.consumerPollingTimeout); } else { - builder.setConsumerPollingTimeout(Duration.standardSeconds(2L)); + builder.setConsumerPollingTimeout(2L); } } @@ -1356,14 +1355,12 @@ public Read withBadRecordErrorHandler(ErrorHandler badRecord } /** - * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A - * lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching - * enough (or any) records. The default is 2 seconds. + * Sets the timeout time in seconds for Kafka consumer polling request in the {@link + * ReadFromKafkaDoFn}. A lower timeout optimizes for latency. Increase the timeout if the + * consumer is not fetching any records. The default is 2 seconds. */ - public Read withConsumerPollingTimeout(Duration duration) { - checkState( - duration == null || duration.compareTo(Duration.ZERO) > 0, - "Consumer polling timeout must be greater than 0."); + public Read withConsumerPollingTimeout(long duration) { + checkState(duration > 0, "Consumer polling timeout must be greater than 0."); return toBuilder().setConsumerPollingTimeout(duration).build(); } @@ -2071,7 +2068,7 @@ public abstract static class ReadSourceDescriptors abstract ErrorHandler getBadRecordErrorHandler(); @Pure - abstract @Nullable Duration getConsumerPollingTimeout(); + abstract long getConsumerPollingTimeout(); abstract boolean isBounded(); @@ -2123,8 +2120,7 @@ abstract ReadSourceDescriptors.Builder setBadRecordRouter( abstract ReadSourceDescriptors.Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); - abstract ReadSourceDescriptors.Builder setConsumerPollingTimeout( - @Nullable Duration duration); + abstract ReadSourceDescriptors.Builder setConsumerPollingTimeout(long duration); abstract ReadSourceDescriptors.Builder setBounded(boolean bounded); @@ -2139,7 +2135,7 @@ public static ReadSourceDescriptors read() { .setBounded(false) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()) - .setConsumerPollingTimeout(Duration.standardSeconds(2L)) + .setConsumerPollingTimeout(2L) .build() .withProcessingTime() .withMonotonicallyIncreasingWatermarkEstimator(); @@ -2402,11 +2398,11 @@ public ReadSourceDescriptors withBadRecordErrorHandler( } /** - * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A - * lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching - * enough (or any) records. The default is 2 seconds. + * Sets the timeout time in seconds for Kafka consumer polling request in the {@link + * ReadFromKafkaDoFn}. A lower timeout optimizes for latency. Increase the timeout if the + * consumer is not fetching any records. The default is 2 seconds. */ - public ReadSourceDescriptors withConsumerPollingTimeout(@Nullable Duration duration) { + public ReadSourceDescriptors withConsumerPollingTimeout(long duration) { return toBuilder().setConsumerPollingTimeout(duration).build(); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index 7e54407300d4..89e2b80a8c6a 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -112,7 +112,12 @@ Object getDefaultValue() { VALUE_DESERIALIZER_PROVIDER, CHECK_STOP_READING_FN(SDF), BAD_RECORD_ERROR_HANDLER(SDF), - CONSUMER_POLLING_TIMEOUT, + CONSUMER_POLLING_TIMEOUT(SDF) { + @Override + Object getDefaultValue() { + return Long.valueOf(2); + } + }, ; @Nonnull private final ImmutableSet supportedImplementations; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 3a821ef9519e..b7bbc904da81 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -191,11 +191,10 @@ private ReadFromKafkaDoFn( this.checkStopReadingFn = transform.getCheckStopReadingFn(); this.badRecordRouter = transform.getBadRecordRouter(); this.recordTag = recordTag; - if (transform.getConsumerPollingTimeout() != null) { - this.consumerPollingTimeout = - java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis()); + if (transform.getConsumerPollingTimeout() > 0) { + this.consumerPollingTimeout = transform.getConsumerPollingTimeout(); } else { - this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT; + this.consumerPollingTimeout = DEFAULT_KAFKA_POLL_TIMEOUT; } } @@ -222,10 +221,8 @@ private ReadFromKafkaDoFn( private transient @Nullable Map offsetEstimatorCache; private transient @Nullable LoadingCache avgRecordSize; - - private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(2); - - @VisibleForTesting final java.time.Duration consumerPollingTimeout; + private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L; + @VisibleForTesting final long consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @VisibleForTesting final Map consumerConfig; @@ -513,9 +510,9 @@ private ConsumerRecords poll( final Stopwatch sw = Stopwatch.createStarted(); long previousPosition = -1; java.time.Duration elapsed = java.time.Duration.ZERO; + java.time.Duration timeout = java.time.Duration.ofSeconds(this.consumerPollingTimeout); while (true) { - final ConsumerRecords rawRecords = - consumer.poll(consumerPollingTimeout.minus(elapsed)); + final ConsumerRecords rawRecords = consumer.poll(timeout.minus(elapsed)); if (!rawRecords.isEmpty()) { // return as we have found some entries return rawRecords; @@ -525,11 +522,11 @@ private ConsumerRecords poll( return rawRecords; } elapsed = sw.elapsed(); - if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) { + if (elapsed.toMillis() >= timeout.toMillis()) { // timeout is over LOG.warn( "No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", - consumerPollingTimeout.getSeconds()); + consumerPollingTimeout); return rawRecords; } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 44c028f08a27..07e5b519c013 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -2123,14 +2123,14 @@ public void testSinkMetrics() throws Exception { @Test(expected = IllegalStateException.class) public void testWithInvalidConsumerPollingTimeout() { - KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(-5)); + KafkaIO.read().withConsumerPollingTimeout(-5L); } @Test public void testWithValidConsumerPollingTimeout() { KafkaIO.Read reader = - KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(15)); - assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds()); + KafkaIO.read().withConsumerPollingTimeout(15L); + assertEquals(15, reader.getConsumerPollingTimeout()); } private static void verifyProducerRecords( diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 8902f22164bc..b8ff08485c3b 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -646,13 +646,12 @@ public void testConstructorWithPollTimeout() { ReadSourceDescriptors descriptors = makeReadSourceDescriptor(consumer); // default poll timeout = 1 scond ReadFromKafkaDoFn dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(Duration.ofSeconds(2L), dofnInstance.consumerPollingTimeout); + Assert.assertEquals(2L, dofnInstance.consumerPollingTimeout); // updated timeout = 5 seconds - descriptors = - descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L)); + descriptors = descriptors.withConsumerPollingTimeout(5L); ReadFromKafkaDoFn dofnInstanceNew = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(Duration.ofSeconds(5L), dofnInstanceNew.consumerPollingTimeout); + Assert.assertEquals(5L, dofnInstanceNew.consumerPollingTimeout); } private BoundednessVisitor testBoundedness( diff --git a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java index 4cbed36c697f..9848e429e215 100644 --- a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java +++ b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java @@ -40,6 +40,7 @@ import org.apache.beam.sdk.io.kafka.KafkaIOUtils; import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.Schema.FieldType; @@ -51,6 +52,7 @@ import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.util.construction.SdkComponents; import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar; +import org.apache.beam.sdk.util.construction.TransformUpgrader; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; @@ -102,6 +104,7 @@ static class KafkaIOReadWithMetadataTranslator implements TransformPayloadTransl .addNullableByteArrayField("key_deserializer_provider") .addNullableByteArrayField("value_deserializer_provider") .addNullableByteArrayField("check_stop_reading_fn") + .addNullableInt64Field("consumer_polling_timeout") .build(); @Override @@ -173,7 +176,7 @@ public Row toConfigRow(Read transform) { if (transform.getStopReadTime() != null) { fieldValues.put("stop_read_time", transform.getStopReadTime()); } - + fieldValues.put("consumer_polling_timeout", transform.getConsumerPollingTimeout()); fieldValues.put( "is_commit_offset_finalize_enabled", transform.isCommitOffsetsInFinalizeEnabled()); fieldValues.put("is_dynamic_read", transform.isDynamicRead()); @@ -217,6 +220,13 @@ public Row toConfigRow(Read transform) { @Override public Read fromConfigRow(Row configRow, PipelineOptions options) { + String updateCompatibilityBeamVersion = + options.as(StreamingOptions.class).getUpdateCompatibilityVersion(); + // We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption + // is not correctly passed in for pipelines that use Beam 2.55.0. + // This is fixed for Beam 2.56.0 and later. + updateCompatibilityBeamVersion = + (updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.55.0"; try { Read transform = KafkaIO.read(); @@ -320,6 +330,15 @@ public Row toConfigRow(Read transform) { transform = transform.withMaxReadTime(org.joda.time.Duration.millis(maxReadTime.toMillis())); } + if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") < 0) { + // set to current default + transform = transform.withConsumerPollingTimeout(2L); + } else { + Long consumerPollingTimeout = configRow.getInt64("consumer_polling_timeout"); + if (consumerPollingTimeout != null) { + transform = transform.withConsumerPollingTimeout(consumerPollingTimeout); + } + } Instant startReadTime = configRow.getValue("start_read_time"); if (startReadTime != null) { transform = transform.withStartReadTime(startReadTime); diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index e1aeab8d3a8c..b4fd7d86e688 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -135,7 +135,7 @@ def __init__( max_read_time=None, commit_offset_in_finalize=False, timestamp_policy=processing_time_policy, - consumer_polling_timeout=None, + consumer_polling_timeout=2, with_metadata=False, expansion_service=None, ):