diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java index 372e77c54c67..d0f6427a262e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaTestTable.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; @@ -138,10 +137,6 @@ public synchronized void assign(final Collection assigned) { .collect(Collectors.toList()); super.assign(realPartitions); assignedPartitions.set(ImmutableList.copyOf(realPartitions)); - for (TopicPartition tp : realPartitions) { - updateBeginningOffsets(ImmutableMap.of(tp, 0L)); - updateEndOffsets(ImmutableMap.of(tp, (long) kafkaRecords.get(tp).size())); - } } // Override offsetsForTimes() in order to look up the offsets by timestamp. @Override @@ -163,9 +158,12 @@ public synchronized Map offsetsForTimes( } }; - for (String topic : getTopics()) { - consumer.updatePartitions(topic, partitionInfoMap.get(topic)); - } + partitionInfoMap.forEach(consumer::updatePartitions); + consumer.updateBeginningOffsets( + kafkaRecords.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 0L))); + consumer.updateEndOffsets( + kafkaRecords.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> (long) e.getValue().size()))); Runnable recordEnqueueTask = new Runnable() { diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java index 6ce6c7d5d233..d86a5d0ce686 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java @@ -138,7 +138,6 @@ public boolean start() throws IOException { name, spec.getOffsetConsumerConfig(), spec.getConsumerConfig()); offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig); - ConsumerSpEL.evaluateAssign(offsetConsumer, topicPartitions); // Fetch offsets once before running periodically. updateLatestOffsets(); @@ -711,23 +710,28 @@ private void setupInitialOffset(PartitionState pState) { // Called from setupInitialOffset() at the start and then periodically from offsetFetcher thread. private void updateLatestOffsets() { Consumer offsetConsumer = Preconditions.checkStateNotNull(this.offsetConsumer); - for (PartitionState p : partitionStates) { - try { - Instant fetchTime = Instant.now(); - ConsumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition); - long offset = offsetConsumer.position(p.topicPartition); - p.setLatestOffset(offset, fetchTime); - } catch (Exception e) { - if (closed.get()) { // Ignore the exception if the reader is closed. - break; - } + List topicPartitions = + Preconditions.checkStateNotNull(source.getSpec().getTopicPartitions()); + Instant fetchTime = Instant.now(); + try { + Map endOffsets = offsetConsumer.endOffsets(topicPartitions); + for (PartitionState p : partitionStates) { + p.setLatestOffset( + Preconditions.checkStateNotNull( + endOffsets.get(p.topicPartition), + "No end offset found for partition %s.", + p.topicPartition), + fetchTime); + } + } catch (Exception e) { + if (!closed.get()) { // Ignore the exception if the reader is closed. LOG.warn( - "{}: exception while fetching latest offset for partition {}. will be retried.", + "{}: exception while fetching latest offset for partitions {}. will be retried.", this, - p.topicPartition, + topicPartitions, e); - // Don't update the latest offset. } + // Don't update the latest offset. } LOG.debug("{}: backlog {}", this, getSplitBacklogBytes()); diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java index 4bda8cf28d4e..fe2d7a64a37f 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -253,13 +254,16 @@ private static class KafkaLatestOffsetEstimator Consumer offsetConsumer, TopicPartition topicPartition) { this.offsetConsumer = offsetConsumer; this.topicPartition = topicPartition; - ConsumerSpEL.evaluateAssign(this.offsetConsumer, ImmutableList.of(this.topicPartition)); memoizedBacklog = Suppliers.memoizeWithExpiration( () -> { synchronized (offsetConsumer) { - ConsumerSpEL.evaluateSeek2End(offsetConsumer, topicPartition); - return offsetConsumer.position(topicPartition); + return Preconditions.checkStateNotNull( + offsetConsumer + .endOffsets(Collections.singleton(topicPartition)) + .get(topicPartition), + "No end offset found for partition %s.", + topicPartition); } }, 1, diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 764e406f71cb..e614320db150 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -77,7 +77,7 @@ import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.kafka.KafkaIO.Read.FakeFlinkPipelineOptions; -import org.apache.beam.sdk.io.kafka.KafkaMocks.PositionErrorConsumerFactory; +import org.apache.beam.sdk.io.kafka.KafkaMocks.EndOffsetErrorConsumerFactory; import org.apache.beam.sdk.io.kafka.KafkaMocks.SendErrorProducerFactory; import org.apache.beam.sdk.metrics.DistributionResult; import org.apache.beam.sdk.metrics.Lineage; @@ -267,10 +267,6 @@ private static MockConsumer mkMockConsumer( public synchronized void assign(final Collection assigned) { super.assign(assigned); assignedPartitions.set(ImmutableList.copyOf(assigned)); - for (TopicPartition tp : assigned) { - updateBeginningOffsets(ImmutableMap.of(tp, 0L)); - updateEndOffsets(ImmutableMap.of(tp, (long) records.get(tp).size())); - } } // Override offsetsForTimes() in order to look up the offsets by timestamp. @Override @@ -290,9 +286,12 @@ public synchronized Map offsetsForTimes( } }; - for (String topic : topics) { - consumer.updatePartitions(topic, partitionMap.get(topic)); - } + partitionMap.forEach(consumer::updatePartitions); + consumer.updateBeginningOffsets( + records.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> 0L))); + consumer.updateEndOffsets( + records.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> (long) e.getValue().size()))); // MockConsumer does not maintain any relationship between partition seek position and the // records added. e.g. if we add 10 records to a partition and then seek to end of the @@ -1525,13 +1524,14 @@ public void testUnboundedReaderLogsCommitFailure() throws Exception { List topics = ImmutableList.of("topic_a"); - PositionErrorConsumerFactory positionErrorConsumerFactory = new PositionErrorConsumerFactory(); + EndOffsetErrorConsumerFactory endOffsetErrorConsumerFactory = + new EndOffsetErrorConsumerFactory(); UnboundedSource, KafkaCheckpointMark> source = KafkaIO.read() .withBootstrapServers("myServer1:9092,myServer2:9092") .withTopics(topics) - .withConsumerFactoryFn(positionErrorConsumerFactory) + .withConsumerFactoryFn(endOffsetErrorConsumerFactory) .withKeyDeserializer(IntegerDeserializer.class) .withValueDeserializer(LongDeserializer.class) .makeSource(); @@ -1540,7 +1540,7 @@ public void testUnboundedReaderLogsCommitFailure() throws Exception { reader.start(); - unboundedReaderExpectedLogs.verifyWarn("exception while fetching latest offset for partition"); + unboundedReaderExpectedLogs.verifyWarn("exception while fetching latest offset for partitions"); reader.close(); } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMocks.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMocks.java index 0844d71e7105..1303f1da3bcd 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMocks.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMocks.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.kafka; import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -27,8 +28,8 @@ import org.apache.beam.sdk.values.KV; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; @@ -66,51 +67,33 @@ public Producer apply(Map input) { } } - public static final class PositionErrorConsumer extends MockConsumer { - - public PositionErrorConsumer() { - super(null); - } - - @Override - public synchronized long position(TopicPartition partition) { - throw new KafkaException("fakeException"); - } - - @Override - public synchronized List partitionsFor(String topic) { - return Collections.singletonList( - new PartitionInfo("topic_a", 1, new Node(1, "myServer1", 9092), null, null)); - } - } - - public static final class PositionErrorConsumerFactory + public static final class EndOffsetErrorConsumerFactory implements SerializableFunction, Consumer> { - public PositionErrorConsumerFactory() {} + public EndOffsetErrorConsumerFactory() {} @Override public MockConsumer apply(Map input) { + final MockConsumer consumer; if (input.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { - return new PositionErrorConsumer(); - } else { - MockConsumer consumer = - new MockConsumer(null) { + consumer = + new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public synchronized long position(TopicPartition partition) { - return 1L; - } - - @Override - public synchronized ConsumerRecords poll(long timeout) { - return ConsumerRecords.empty(); + public synchronized Map endOffsets( + Collection partitions) { + throw new KafkaException("fakeException"); } }; - consumer.updatePartitions( - "topic_a", - Collections.singletonList( - new PartitionInfo("topic_a", 1, new Node(1, "myServer1", 9092), null, null))); - return consumer; + } else { + consumer = new MockConsumer(OffsetResetStrategy.EARLIEST); } + consumer.updatePartitions( + "topic_a", + Collections.singletonList( + new PartitionInfo("topic_a", 1, new Node(1, "myServer1", 9092), null, null))); + consumer.updateBeginningOffsets( + Collections.singletonMap(new TopicPartition("topic_a", 1), 0L)); + consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition("topic_a", 1), 0L)); + return consumer; } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java index 3189bbb140f0..52c141685760 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java @@ -205,6 +205,8 @@ public SimpleMockKafkaConsumer( OffsetResetStrategy offsetResetStrategy, TopicPartition topicPartition) { super(offsetResetStrategy); this.topicPartition = topicPartition; + updateBeginningOffsets(ImmutableMap.of(topicPartition, 0L)); + updateEndOffsets(ImmutableMap.of(topicPartition, Long.MAX_VALUE)); } public void reset() { @@ -214,6 +216,8 @@ public void reset() { this.startOffsetForTime = KV.of(0L, Instant.now()); this.stopOffsetForTime = KV.of(Long.MAX_VALUE, null); this.numOfRecordsPerPoll = 0L; + updateBeginningOffsets(ImmutableMap.of(topicPartition, 0L)); + updateEndOffsets(ImmutableMap.of(topicPartition, Long.MAX_VALUE)); } public void setRemoved() {