diff --git a/CHANGES.md b/CHANGES.md index cf78044ad104..ad9d83094311 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -74,7 +74,6 @@ ## Breaking Changes * X behavior was changed ([#X](https://github.com/apache/beam/issues/X)). -* Default consumer polling timeout for KafkaIO.Read was increased from 1 second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration duration) to configure this timeout value when necessary ([#30870](https://github.com/apache/beam/issues/30870)). * Python Dataflow users no longer need to manually specify --streaming for pipelines using unbounded sources such as ReadFromPubSub. ## Deprecations diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 6a0df5897063..231a1b9e49e1 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -587,7 +587,6 @@ public static Read read() { .setCommitOffsetsInFinalizeEnabled(false) .setDynamicRead(false) .setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime()) - .setConsumerPollingTimeout(Duration.standardSeconds(2L)) .build(); } @@ -707,9 +706,6 @@ public abstract static class Read @Pure public abstract @Nullable ErrorHandler getBadRecordErrorHandler(); - @Pure - public abstract @Nullable Duration getConsumerPollingTimeout(); - abstract Builder toBuilder(); @AutoValue.Builder @@ -766,8 +762,6 @@ Builder setCheckStopReadingFn( return setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn)); } - abstract Builder setConsumerPollingTimeout(Duration consumerPollingTimeout); - abstract Read build(); static void setupExternalBuilder( @@ -831,16 +825,6 @@ static void setupExternalBuilder( // We can expose dynamic read to external build when ReadFromKafkaDoFn is the default // implementation. builder.setDynamicRead(false); - - if (config.consumerPollingTimeout != null) { - if (config.consumerPollingTimeout <= 0) { - throw new IllegalArgumentException("consumerPollingTimeout should be > 0."); - } - builder.setConsumerPollingTimeout( - Duration.standardSeconds(config.consumerPollingTimeout)); - } else { - builder.setConsumerPollingTimeout(Duration.standardSeconds(2L)); - } } private static Coder resolveCoder(Class> deserializer) { @@ -903,7 +887,6 @@ public static class Configuration { private Long maxNumRecords; private Long maxReadTime; private Boolean commitOffsetInFinalize; - private Long consumerPollingTimeout; private String timestampPolicy; public void setConsumerConfig(Map consumerConfig) { @@ -945,10 +928,6 @@ public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) { public void setTimestampPolicy(String timestampPolicy) { this.timestampPolicy = timestampPolicy; } - - public void setConsumerPollingTimeout(Long consumerPollingTimeout) { - this.consumerPollingTimeout = consumerPollingTimeout; - } } } @@ -1355,18 +1334,6 @@ public Read withBadRecordErrorHandler(ErrorHandler badRecord return toBuilder().setBadRecordErrorHandler(badRecordErrorHandler).build(); } - /** - * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A - * lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching - * enough (or any) records. The default is 2 seconds. - */ - public Read withConsumerPollingTimeout(Duration duration) { - checkState( - duration == null || duration.compareTo(Duration.ZERO) > 0, - "Consumer polling timeout must be greater than 0."); - return toBuilder().setConsumerPollingTimeout(duration).build(); - } - /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */ public PTransform>> withoutMetadata() { return new TypedWithoutMetadata<>(this); @@ -1629,8 +1596,7 @@ public PCollection> expand(PBegin input) { .withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider()) .withManualWatermarkEstimator() .withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory()) - .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()) - .withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout()); + .withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn()); if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) { readTransform = readTransform.commitOffsets(); } @@ -2070,9 +2036,6 @@ public abstract static class ReadSourceDescriptors @Pure abstract ErrorHandler getBadRecordErrorHandler(); - @Pure - abstract @Nullable Duration getConsumerPollingTimeout(); - abstract boolean isBounded(); abstract ReadSourceDescriptors.Builder toBuilder(); @@ -2123,9 +2086,6 @@ abstract ReadSourceDescriptors.Builder setBadRecordRouter( abstract ReadSourceDescriptors.Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); - abstract ReadSourceDescriptors.Builder setConsumerPollingTimeout( - @Nullable Duration duration); - abstract ReadSourceDescriptors.Builder setBounded(boolean bounded); abstract ReadSourceDescriptors build(); @@ -2139,7 +2099,6 @@ public static ReadSourceDescriptors read() { .setBounded(false) .setBadRecordRouter(BadRecordRouter.THROWING_ROUTER) .setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>()) - .setConsumerPollingTimeout(Duration.standardSeconds(2L)) .build() .withProcessingTime() .withMonotonicallyIncreasingWatermarkEstimator(); @@ -2401,15 +2360,6 @@ public ReadSourceDescriptors withBadRecordErrorHandler( .build(); } - /** - * Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A - * lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching - * enough (or any) records. The default is 2 seconds. - */ - public ReadSourceDescriptors withConsumerPollingTimeout(@Nullable Duration duration) { - return toBuilder().setConsumerPollingTimeout(duration).build(); - } - ReadAllFromRow forExternalBuild() { return new ReadAllFromRow<>(this); } diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java index 7e54407300d4..a2cc9aaeb4d9 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIOReadImplementationCompatibility.java @@ -112,7 +112,6 @@ Object getDefaultValue() { VALUE_DESERIALIZER_PROVIDER, CHECK_STOP_READING_FN(SDF), BAD_RECORD_ERROR_HANDLER(SDF), - CONSUMER_POLLING_TIMEOUT, ; @Nonnull private final ImmutableSet supportedImplementations; diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 3a821ef9519e..924833290f13 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -191,12 +191,6 @@ private ReadFromKafkaDoFn( this.checkStopReadingFn = transform.getCheckStopReadingFn(); this.badRecordRouter = transform.getBadRecordRouter(); this.recordTag = recordTag; - if (transform.getConsumerPollingTimeout() != null) { - this.consumerPollingTimeout = - java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis()); - } else { - this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT; - } } private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class); @@ -223,9 +217,8 @@ private ReadFromKafkaDoFn( private transient @Nullable LoadingCache avgRecordSize; - private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(2); + private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1); - @VisibleForTesting final java.time.Duration consumerPollingTimeout; @VisibleForTesting final DeserializerProvider keyDeserializerProvider; @VisibleForTesting final DeserializerProvider valueDeserializerProvider; @VisibleForTesting final Map consumerConfig; @@ -515,7 +508,7 @@ private ConsumerRecords poll( java.time.Duration elapsed = java.time.Duration.ZERO; while (true) { final ConsumerRecords rawRecords = - consumer.poll(consumerPollingTimeout.minus(elapsed)); + consumer.poll(KAFKA_POLL_TIMEOUT.minus(elapsed)); if (!rawRecords.isEmpty()) { // return as we have found some entries return rawRecords; @@ -525,11 +518,8 @@ private ConsumerRecords poll( return rawRecords; } elapsed = sw.elapsed(); - if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) { + if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) { // timeout is over - LOG.warn( - "No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.", - consumerPollingTimeout.getSeconds()); return rawRecords; } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java index 246fdd80d739..dd859af50864 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java @@ -107,8 +107,7 @@ public void testConstructKafkaRead() throws Exception { Field.of("value_deserializer", FieldType.STRING), Field.of("start_read_time", FieldType.INT64), Field.of("commit_offset_in_finalize", FieldType.BOOLEAN), - Field.of("timestamp_policy", FieldType.STRING), - Field.of("consumer_polling_timeout", FieldType.INT64))) + Field.of("timestamp_policy", FieldType.STRING))) .withFieldValue("topics", topics) .withFieldValue("consumer_config", consumerConfig) .withFieldValue("key_deserializer", keyDeserializer) @@ -116,7 +115,6 @@ public void testConstructKafkaRead() throws Exception { .withFieldValue("start_read_time", startReadTime) .withFieldValue("commit_offset_in_finalize", false) .withFieldValue("timestamp_policy", "ProcessingTime") - .withFieldValue("consumer_polling_timeout", 5L) .build()); RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance(); @@ -267,7 +265,6 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception { expansionService.expand(request, observer); ExpansionApi.ExpansionResponse result = observer.result; RunnerApi.PTransform transform = result.getTransform(); - assertThat( transform.getSubtransformsList(), Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*"))); diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 44c028f08a27..9b15b86051f5 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -2121,18 +2121,6 @@ public void testSinkMetrics() throws Exception { } } - @Test(expected = IllegalStateException.class) - public void testWithInvalidConsumerPollingTimeout() { - KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(-5)); - } - - @Test - public void testWithValidConsumerPollingTimeout() { - KafkaIO.Read reader = - KafkaIO.read().withConsumerPollingTimeout(Duration.standardSeconds(15)); - assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds()); - } - private static void verifyProducerRecords( MockProducer mockProducer, String topic, diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 8902f22164bc..48b5b060a295 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -641,20 +641,6 @@ public void testUnbounded() { Assert.assertNotEquals(0, visitor.unboundedPCollections.size()); } - @Test - public void testConstructorWithPollTimeout() { - ReadSourceDescriptors descriptors = makeReadSourceDescriptor(consumer); - // default poll timeout = 1 scond - ReadFromKafkaDoFn dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(Duration.ofSeconds(2L), dofnInstance.consumerPollingTimeout); - // updated timeout = 5 seconds - descriptors = - descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L)); - ReadFromKafkaDoFn dofnInstanceNew = - ReadFromKafkaDoFn.create(descriptors, RECORDS); - Assert.assertEquals(Duration.ofSeconds(5L), dofnInstanceNew.consumerPollingTimeout); - } - private BoundednessVisitor testBoundedness( Function, ReadSourceDescriptors> readSourceDescriptorsDecorator) { diff --git a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java index f69b9c3649b4..18708c560018 100644 --- a/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java +++ b/sdks/java/io/kafka/upgrade/src/test/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslationTest.java @@ -75,7 +75,6 @@ public class KafkaIOTranslationTest { READ_TRANSFORM_SCHEMA_MAPPING.put( "getValueDeserializerProvider", "value_deserializer_provider"); READ_TRANSFORM_SCHEMA_MAPPING.put("getCheckStopReadingFn", "check_stop_reading_fn"); - READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerPollingTimeout", "consumer_polling_timeout"); } // A mapping from Write transform builder methods to the corresponding schema fields in diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py index e1aeab8d3a8c..b96576b4efb3 100644 --- a/sdks/python/apache_beam/io/kafka.py +++ b/sdks/python/apache_beam/io/kafka.py @@ -93,8 +93,7 @@ ('value_deserializer', str), ('start_read_time', typing.Optional[int]), ('max_num_records', typing.Optional[int]), ('max_read_time', typing.Optional[int]), - ('commit_offset_in_finalize', bool), ('timestamp_policy', str), - ('consumer_polling_timeout', typing.Optional[int])]) + ('commit_offset_in_finalize', bool), ('timestamp_policy', str)]) def default_io_expansion_service(append_args=None): @@ -135,7 +134,6 @@ def __init__( max_read_time=None, commit_offset_in_finalize=False, timestamp_policy=processing_time_policy, - consumer_polling_timeout=None, with_metadata=False, expansion_service=None, ): @@ -161,10 +159,6 @@ def __init__( :param commit_offset_in_finalize: Whether to commit offsets when finalizing. :param timestamp_policy: The built-in timestamp policy which is used for extracting timestamp from KafkaRecord. - :param consumer_polling_timeout: Kafka client polling request - timeout time in seconds. A lower timeout optimizes for latency. Increase - the timeout if the consumer is not fetching any records. Default is 2 - seconds. :param with_metadata: whether the returned PCollection should contain Kafka related metadata or not. If False (default), elements of the returned PCollection will be of type 'bytes' if True, elements of the @@ -192,8 +186,7 @@ def __init__( max_read_time=max_read_time, start_read_time=start_read_time, commit_offset_in_finalize=commit_offset_in_finalize, - timestamp_policy=timestamp_policy, - consumer_polling_timeout=consumer_polling_timeout)), + timestamp_policy=timestamp_policy)), expansion_service or default_io_expansion_service())