Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* Default consumer polling timeout for KafkaIO.Read was increased from 1 second to 2 seconds. Use KafkaIO.read().withConsumerPollingTimeout(Duration duration) to configure this timeout value when necessary ([#30870](https://github.com/apache/beam/issues/30870)).
* Python Dataflow users no longer need to manually specify --streaming for pipelines using unbounded sources such as ReadFromPubSub.

## Deprecations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ public static <K, V> Read<K, V> read() {
.setCommitOffsetsInFinalizeEnabled(false)
.setDynamicRead(false)
.setTimestampPolicyFactory(TimestampPolicyFactory.withProcessingTime())
.setConsumerPollingTimeout(Duration.standardSeconds(2L))
.build();
}

Expand Down Expand Up @@ -707,9 +706,6 @@ public abstract static class Read<K, V>
@Pure
public abstract @Nullable ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

@Pure
public abstract @Nullable Duration getConsumerPollingTimeout();

abstract Builder<K, V> toBuilder();

@AutoValue.Builder
Expand Down Expand Up @@ -766,8 +762,6 @@ Builder<K, V> setCheckStopReadingFn(
return setCheckStopReadingFn(CheckStopReadingFnWrapper.of(checkStopReadingFn));
}

abstract Builder<K, V> setConsumerPollingTimeout(Duration consumerPollingTimeout);

abstract Read<K, V> build();

static <K, V> void setupExternalBuilder(
Expand Down Expand Up @@ -831,16 +825,6 @@ static <K, V> void setupExternalBuilder(
// We can expose dynamic read to external build when ReadFromKafkaDoFn is the default
// implementation.
builder.setDynamicRead(false);

if (config.consumerPollingTimeout != null) {
if (config.consumerPollingTimeout <= 0) {
throw new IllegalArgumentException("consumerPollingTimeout should be > 0.");
}
builder.setConsumerPollingTimeout(
Duration.standardSeconds(config.consumerPollingTimeout));
} else {
builder.setConsumerPollingTimeout(Duration.standardSeconds(2L));
}
}

private static <T> Coder<T> resolveCoder(Class<Deserializer<T>> deserializer) {
Expand Down Expand Up @@ -903,7 +887,6 @@ public static class Configuration {
private Long maxNumRecords;
private Long maxReadTime;
private Boolean commitOffsetInFinalize;
private Long consumerPollingTimeout;
private String timestampPolicy;

public void setConsumerConfig(Map<String, String> consumerConfig) {
Expand Down Expand Up @@ -945,10 +928,6 @@ public void setCommitOffsetInFinalize(Boolean commitOffsetInFinalize) {
public void setTimestampPolicy(String timestampPolicy) {
this.timestampPolicy = timestampPolicy;
}

public void setConsumerPollingTimeout(Long consumerPollingTimeout) {
this.consumerPollingTimeout = consumerPollingTimeout;
}
}
}

Expand Down Expand Up @@ -1355,18 +1334,6 @@ public Read<K, V> withBadRecordErrorHandler(ErrorHandler<BadRecord, ?> badRecord
return toBuilder().setBadRecordErrorHandler(badRecordErrorHandler).build();
}

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A
* lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching
* enough (or any) records. The default is 2 seconds.
*/
public Read<K, V> withConsumerPollingTimeout(Duration duration) {
checkState(
duration == null || duration.compareTo(Duration.ZERO) > 0,
"Consumer polling timeout must be greater than 0.");
return toBuilder().setConsumerPollingTimeout(duration).build();
}

/** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
return new TypedWithoutMetadata<>(this);
Expand Down Expand Up @@ -1629,8 +1596,7 @@ public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
.withValueDeserializerProvider(kafkaRead.getValueDeserializerProvider())
.withManualWatermarkEstimator()
.withTimestampPolicyFactory(kafkaRead.getTimestampPolicyFactory())
.withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn())
.withConsumerPollingTimeout(kafkaRead.getConsumerPollingTimeout());
.withCheckStopReadingFn(kafkaRead.getCheckStopReadingFn());
if (kafkaRead.isCommitOffsetsInFinalizeEnabled()) {
readTransform = readTransform.commitOffsets();
}
Expand Down Expand Up @@ -2070,9 +2036,6 @@ public abstract static class ReadSourceDescriptors<K, V>
@Pure
abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

@Pure
abstract @Nullable Duration getConsumerPollingTimeout();

abstract boolean isBounded();

abstract ReadSourceDescriptors.Builder<K, V> toBuilder();
Expand Down Expand Up @@ -2123,9 +2086,6 @@ abstract ReadSourceDescriptors.Builder<K, V> setBadRecordRouter(
abstract ReadSourceDescriptors.Builder<K, V> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract ReadSourceDescriptors.Builder<K, V> setConsumerPollingTimeout(
@Nullable Duration duration);

abstract ReadSourceDescriptors.Builder<K, V> setBounded(boolean bounded);

abstract ReadSourceDescriptors<K, V> build();
Expand All @@ -2139,7 +2099,6 @@ public static <K, V> ReadSourceDescriptors<K, V> read() {
.setBounded(false)
.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER)
.setBadRecordErrorHandler(new ErrorHandler.DefaultErrorHandler<>())
.setConsumerPollingTimeout(Duration.standardSeconds(2L))
.build()
.withProcessingTime()
.withMonotonicallyIncreasingWatermarkEstimator();
Expand Down Expand Up @@ -2401,15 +2360,6 @@ public ReadSourceDescriptors<K, V> withBadRecordErrorHandler(
.build();
}

/**
* Sets the timeout time for Kafka consumer polling request in the {@link ReadFromKafkaDoFn}. A
* lower timeout optimizes for latency. Increase the timeout if the consumer is not fetching
* enough (or any) records. The default is 2 seconds.
*/
public ReadSourceDescriptors<K, V> withConsumerPollingTimeout(@Nullable Duration duration) {
return toBuilder().setConsumerPollingTimeout(duration).build();
}

ReadAllFromRow<K, V> forExternalBuild() {
return new ReadAllFromRow<>(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ Object getDefaultValue() {
VALUE_DESERIALIZER_PROVIDER,
CHECK_STOP_READING_FN(SDF),
BAD_RECORD_ERROR_HANDLER(SDF),
CONSUMER_POLLING_TIMEOUT,
;

@Nonnull private final ImmutableSet<KafkaIOReadImplementation> supportedImplementations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,6 @@ private ReadFromKafkaDoFn(
this.checkStopReadingFn = transform.getCheckStopReadingFn();
this.badRecordRouter = transform.getBadRecordRouter();
this.recordTag = recordTag;
if (transform.getConsumerPollingTimeout() != null) {
this.consumerPollingTimeout =
java.time.Duration.ofMillis(transform.getConsumerPollingTimeout().getMillis());
} else {
this.consumerPollingTimeout = KAFKA_POLL_TIMEOUT;
}
}

private static final Logger LOG = LoggerFactory.getLogger(ReadFromKafkaDoFn.class);
Expand All @@ -223,9 +217,8 @@ private ReadFromKafkaDoFn(

private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;

private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(2);
private static final java.time.Duration KAFKA_POLL_TIMEOUT = java.time.Duration.ofSeconds(1);

@VisibleForTesting final java.time.Duration consumerPollingTimeout;
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
@VisibleForTesting final Map<String, Object> consumerConfig;
Expand Down Expand Up @@ -515,7 +508,7 @@ private ConsumerRecords<byte[], byte[]> poll(
java.time.Duration elapsed = java.time.Duration.ZERO;
while (true) {
final ConsumerRecords<byte[], byte[]> rawRecords =
consumer.poll(consumerPollingTimeout.minus(elapsed));
consumer.poll(KAFKA_POLL_TIMEOUT.minus(elapsed));
if (!rawRecords.isEmpty()) {
// return as we have found some entries
return rawRecords;
Expand All @@ -525,11 +518,8 @@ private ConsumerRecords<byte[], byte[]> poll(
return rawRecords;
}
elapsed = sw.elapsed();
if (elapsed.toMillis() >= consumerPollingTimeout.toMillis()) {
if (elapsed.toMillis() >= KAFKA_POLL_TIMEOUT.toMillis()) {
// timeout is over
LOG.warn(
"No messages retrieved with polling timeout {} seconds. Consider increasing the consumer polling timeout using withConsumerPollingTimeout method.",
consumerPollingTimeout.getSeconds());
return rawRecords;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,14 @@ public void testConstructKafkaRead() throws Exception {
Field.of("value_deserializer", FieldType.STRING),
Field.of("start_read_time", FieldType.INT64),
Field.of("commit_offset_in_finalize", FieldType.BOOLEAN),
Field.of("timestamp_policy", FieldType.STRING),
Field.of("consumer_polling_timeout", FieldType.INT64)))
Field.of("timestamp_policy", FieldType.STRING)))
.withFieldValue("topics", topics)
.withFieldValue("consumer_config", consumerConfig)
.withFieldValue("key_deserializer", keyDeserializer)
.withFieldValue("value_deserializer", valueDeserializer)
.withFieldValue("start_read_time", startReadTime)
.withFieldValue("commit_offset_in_finalize", false)
.withFieldValue("timestamp_policy", "ProcessingTime")
.withFieldValue("consumer_polling_timeout", 5L)
.build());

RunnerApi.Components defaultInstance = RunnerApi.Components.getDefaultInstance();
Expand Down Expand Up @@ -267,7 +265,6 @@ public void testConstructKafkaReadWithoutMetadata() throws Exception {
expansionService.expand(request, observer);
ExpansionApi.ExpansionResponse result = observer.result;
RunnerApi.PTransform transform = result.getTransform();

assertThat(
transform.getSubtransformsList(),
Matchers.hasItem(MatchesPattern.matchesPattern(".*KafkaIO-Read.*")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2121,18 +2121,6 @@ public void testSinkMetrics() throws Exception {
}
}

@Test(expected = IllegalStateException.class)
public void testWithInvalidConsumerPollingTimeout() {
KafkaIO.<Integer, Long>read().withConsumerPollingTimeout(Duration.standardSeconds(-5));
}

@Test
public void testWithValidConsumerPollingTimeout() {
KafkaIO.Read<Integer, Long> reader =
KafkaIO.<Integer, Long>read().withConsumerPollingTimeout(Duration.standardSeconds(15));
assertEquals(15, reader.getConsumerPollingTimeout().getStandardSeconds());
}

private static void verifyProducerRecords(
MockProducer<Integer, Long> mockProducer,
String topic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,20 +641,6 @@ public void testUnbounded() {
Assert.assertNotEquals(0, visitor.unboundedPCollections.size());
}

@Test
public void testConstructorWithPollTimeout() {
ReadSourceDescriptors<String, String> descriptors = makeReadSourceDescriptor(consumer);
// default poll timeout = 1 scond
ReadFromKafkaDoFn<String, String> dofnInstance = ReadFromKafkaDoFn.create(descriptors, RECORDS);
Assert.assertEquals(Duration.ofSeconds(2L), dofnInstance.consumerPollingTimeout);
// updated timeout = 5 seconds
descriptors =
descriptors.withConsumerPollingTimeout(org.joda.time.Duration.standardSeconds(5L));
ReadFromKafkaDoFn<String, String> dofnInstanceNew =
ReadFromKafkaDoFn.create(descriptors, RECORDS);
Assert.assertEquals(Duration.ofSeconds(5L), dofnInstanceNew.consumerPollingTimeout);
}

private BoundednessVisitor testBoundedness(
Function<ReadSourceDescriptors<String, String>, ReadSourceDescriptors<String, String>>
readSourceDescriptorsDecorator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public class KafkaIOTranslationTest {
READ_TRANSFORM_SCHEMA_MAPPING.put(
"getValueDeserializerProvider", "value_deserializer_provider");
READ_TRANSFORM_SCHEMA_MAPPING.put("getCheckStopReadingFn", "check_stop_reading_fn");
READ_TRANSFORM_SCHEMA_MAPPING.put("getConsumerPollingTimeout", "consumer_polling_timeout");
}

// A mapping from Write transform builder methods to the corresponding schema fields in
Expand Down
11 changes: 2 additions & 9 deletions sdks/python/apache_beam/io/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@
('value_deserializer', str), ('start_read_time', typing.Optional[int]),
('max_num_records', typing.Optional[int]),
('max_read_time', typing.Optional[int]),
('commit_offset_in_finalize', bool), ('timestamp_policy', str),
('consumer_polling_timeout', typing.Optional[int])])
('commit_offset_in_finalize', bool), ('timestamp_policy', str)])


def default_io_expansion_service(append_args=None):
Expand Down Expand Up @@ -135,7 +134,6 @@ def __init__(
max_read_time=None,
commit_offset_in_finalize=False,
timestamp_policy=processing_time_policy,
consumer_polling_timeout=None,
with_metadata=False,
expansion_service=None,
):
Expand All @@ -161,10 +159,6 @@ def __init__(
:param commit_offset_in_finalize: Whether to commit offsets when finalizing.
:param timestamp_policy: The built-in timestamp policy which is used for
extracting timestamp from KafkaRecord.
:param consumer_polling_timeout: Kafka client polling request
timeout time in seconds. A lower timeout optimizes for latency. Increase
the timeout if the consumer is not fetching any records. Default is 2
seconds.
:param with_metadata: whether the returned PCollection should contain
Kafka related metadata or not. If False (default), elements of the
returned PCollection will be of type 'bytes' if True, elements of the
Expand Down Expand Up @@ -192,8 +186,7 @@ def __init__(
max_read_time=max_read_time,
start_read_time=start_read_time,
commit_offset_in_finalize=commit_offset_in_finalize,
timestamp_policy=timestamp_policy,
consumer_polling_timeout=consumer_polling_timeout)),
timestamp_policy=timestamp_policy)),
expansion_service or default_io_expansion_service())


Expand Down