From c2bf2ff27e09d3ab817898ebc6d4f2dd28248eb7 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 5 Sep 2019 16:45:41 -0700 Subject: [PATCH 1/7] add overloaded function --- .../kafka/clients/consumer/Consumer.java | 12 ++++ .../kafka/clients/consumer/KafkaConsumer.java | 67 +++++++++++++++++-- 2 files changed, 74 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java index 7fdf4398b9add..5136d6886dd2d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java @@ -154,13 +154,25 @@ public interface Consumer extends Closeable { /** * @see KafkaConsumer#committed(TopicPartition) */ + @Deprecated OffsetAndMetadata committed(TopicPartition partition); /** * @see KafkaConsumer#committed(TopicPartition, Duration) */ + @Deprecated OffsetAndMetadata committed(TopicPartition partition, final Duration timeout); + /** + * @see KafkaConsumer#committed(Set) + */ + Map committed(Set partitions); + + /** + * @see KafkaConsumer#committed(Set, Duration) + */ + Map committed(Set partitions, final Duration timeout); + /** * @see KafkaConsumer#metrics() */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index be3e176c2eeb9..65029a974c3b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1735,7 +1735,10 @@ public long position(TopicPartition partition, final Duration timeout) { * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before * the timeout specified by {@code default.api.timeout.ms} expires. + * + * @deprecated since 2.4 Use {@link #committed(Set)} instead */ + @Deprecated @Override public OffsetAndMetadata committed(TopicPartition partition) { return committed(partition, Duration.ofMillis(defaultApiTimeoutMs)); @@ -1760,21 +1763,75 @@ public OffsetAndMetadata committed(TopicPartition partition) { * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before * expiration of the timeout + * + * @deprecated since 2.4 Use {@link #committed(Set, Duration)} instead */ + @Deprecated @Override public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { + return committed(Collections.singleton(partition), timeout).get(partition); + } + + /** + * Get the last committed offsets for the given partitions (whether the commit happened by this process or + * another). The returned offsets will be used as the position for the consumer in the event of a failure. + *

+ * This call will do a remote call to get the latest committed offset from the server, and will block until the + * committed offset is gotten successfully, an unrecoverable error is encountered (in which case it is thrown to + * the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a + * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). + * + * @param partitions The partition to check + * @return The last committed offset and metadata or null if there was no prior commit + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * the timeout specified by {@code default.api.timeout.ms} expires. + */ + @Override + public Map committed(Set partitions) { + return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs)); + } + + /** + * Get the last committed offsets for the given partitions (whether the commit happened by this process or + * another). The returned offsets will be used as the position for the consumer in the event of a failure. + *

+ * This call will block to do a remote call to get the latest committed offsets from the server. + * + * @param partitions The partitions to check + * @param timeout The maximum amount of time to await the current committed offset + * @return The last committed offset and metadata or null if there was no prior commit + * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this + * function is called + * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while + * this function is called + * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details + * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the + * configured groupId. See the exception for more details + * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors + * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before + * expiration of the timeout + */ + @Override + public Map committed(Set partitions, final Duration timeout) { acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); - Map offsets = coordinator.fetchCommittedOffsets( - Collections.singleton(partition), time.timer(timeout)); + Map offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout)); if (offsets == null) { throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " + - "committed offset for partition " + partition + " could be determined. Try tuning default.api.timeout.ms " + - "larger to relax the threshold."); + "committed offset for partition " + partitions + " could be determined. Try tuning default.api.timeout.ms " + + "larger to relax the threshold."); } else { offsets.forEach(this::updateLastSeenEpochIfNewer); - return offsets.get(partition); + return offsets; } } finally { release(); From 649fc4f15207437fa7e2c3acbca5073fd15c8212 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sun, 15 Sep 2019 16:15:45 -0700 Subject: [PATCH 2/7] refactor all unit tests --- .../kafka/clients/consumer/KafkaConsumer.java | 12 ++++- .../kafka/clients/consumer/MockConsumer.java | 26 ++++++++--- .../clients/consumer/KafkaConsumerTest.java | 20 ++++---- .../clients/consumer/MockConsumerTest.java | 10 ++-- .../kafka/api/ConsumerBounceTest.scala | 6 +-- .../kafka/api/PlaintextConsumerTest.scala | 46 +++++++++---------- .../kafka/api/TransactionsTest.scala | 2 +- .../admin/ConsumerGroupCommandTest.scala | 14 ++---- .../scala/unit/kafka/utils/TestUtils.scala | 9 ++-- .../processor/internals/AbstractTask.java | 20 +++++--- .../internals/ProcessorStateManager.java | 9 ++-- .../processor/internals/StandbyTask.java | 12 +++-- .../processor/internals/StreamTask.java | 13 +++--- .../processor/internals/StandbyTaskTest.java | 8 ++-- .../processor/internals/StreamTaskTest.java | 2 +- .../tools/TransactionalMessageCopier.java | 11 +++-- 16 files changed, 125 insertions(+), 95 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 65029a974c3b6..d54074573bcaf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1776,6 +1776,10 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time * Get the last committed offsets for the given partitions (whether the commit happened by this process or * another). The returned offsets will be used as the position for the consumer in the event of a failure. *

+ * For partitions that do not have a committed offset, they would not be included in the returned map. + *

+ * If any of the partitions requested do not exist, an exception would be thrown. + *

* This call will do a remote call to get the latest committed offset from the server, and will block until the * committed offset is gotten successfully, an unrecoverable error is encountered (in which case it is thrown to * the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a @@ -1795,7 +1799,7 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time * the timeout specified by {@code default.api.timeout.ms} expires. */ @Override - public Map committed(Set partitions) { + public Map committed(final Set partitions) { return committed(partitions, Duration.ofMillis(defaultApiTimeoutMs)); } @@ -1803,6 +1807,10 @@ public Map committed(Set part * Get the last committed offsets for the given partitions (whether the commit happened by this process or * another). The returned offsets will be used as the position for the consumer in the event of a failure. *

+ * For partitions that do not have a committed offset, they would not be included in the returned map. + *

+ * If any of the partitions requested do not exist, an exception would be thrown. + *

* This call will block to do a remote call to get the latest committed offsets from the server. * * @param partitions The partitions to check @@ -1820,7 +1828,7 @@ public Map committed(Set part * expiration of the timeout */ @Override - public Map committed(Set partitions, final Duration timeout) { + public Map committed(final Set partitions, final Duration timeout) { acquireAndEnsureOpen(); try { maybeThrowInvalidGroupIdException(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 947746f3ce1af..5d7e829637fb0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** @@ -290,20 +291,31 @@ public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) subscriptions.seek(partition, offsetAndMetadata.offset()); } + @Deprecated @Override - public synchronized OffsetAndMetadata committed(TopicPartition partition) { - ensureNotClosed(); - if (subscriptions.isAssigned(partition)) { - return committed.get(partition); - } - return new OffsetAndMetadata(0); + public synchronized OffsetAndMetadata committed(final TopicPartition partition) { + return committed(Collections.singleton(partition)).get(partition); } + @Deprecated @Override - public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) { + public OffsetAndMetadata committed(final TopicPartition partition, final Duration timeout) { return committed(partition); } + @Override + public synchronized Map committed(final Set partitions) { + ensureNotClosed(); + + return partitions.stream().collect( + Collectors.toMap(tp -> tp, tp -> subscriptions.isAssigned(tp) ? committed.get(tp) : new OffsetAndMetadata(0))); + } + + @Override + public synchronized Map committed(final Set partitions, final Duration timeout) { + return committed(partitions); + } + @Override public synchronized long position(TopicPartition partition) { ensureNotClosed(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 7892d6d1c92db..a9c4d259aa4ea 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -696,7 +696,7 @@ public void testCommitsFetchedDuringAssign() { // fetch offset for one topic client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, offset1), Errors.NONE), coordinator); - assertEquals(offset1, consumer.committed(tp0).offset()); + assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); consumer.assign(Arrays.asList(tp0, tp1)); @@ -704,12 +704,12 @@ public void testCommitsFetchedDuringAssign() { Map offsets = new HashMap<>(); offsets.put(tp0, offset1); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(offset1, consumer.committed(tp0).offset()); + assertEquals(offset1, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); offsets.remove(tp0); offsets.put(tp1, offset2); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(offset2, consumer.committed(tp1).offset()); + assertEquals(offset2, consumer.committed(Collections.singleton(tp1)).get(tp1).offset()); consumer.close(Duration.ofMillis(0)); } @@ -1137,7 +1137,7 @@ public void testManualAssignmentChangeWithAutoCommitEnabled() { // fetch offset for one topic client.prepareResponseFrom(offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator); - assertEquals(0, consumer.committed(tp0).offset()); + assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); // verify that assignment immediately changes assertTrue(consumer.assignment().equals(singleton(tp0))); @@ -1195,7 +1195,7 @@ public void testManualAssignmentChangeWithAutoCommitDisabled() { client.prepareResponseFrom( offsetResponse(Collections.singletonMap(tp0, 0L), Errors.NONE), coordinator); - assertEquals(0, consumer.committed(tp0).offset()); + assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); // verify that assignment immediately changes assertTrue(consumer.assignment().equals(singleton(tp0))); @@ -1256,12 +1256,12 @@ public void testOffsetOfPausedPartitions() { offsets.put(tp1, 0L); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(0, consumer.committed(tp0).offset()); + assertEquals(0, consumer.committed(Collections.singleton(tp0)).get(tp0).offset()); offsets.remove(tp0); offsets.put(tp1, 0L); client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), coordinator); - assertEquals(0, consumer.committed(tp1).offset()); + assertEquals(0, consumer.committed(Collections.singleton(tp1)).get(tp1).offset()); // fetch and verify consumer's position in the two partitions final Map offsetResponse = new HashMap<>(); @@ -1356,7 +1356,7 @@ public void testOperationsBySubscribingConsumerWithDefaultGroupId() { } try { - newConsumer((String) null).committed(tp0); + newConsumer((String) null).committed(Collections.singleton(tp0)).get(tp0); fail("Expected an InvalidGroupIdException"); } catch (InvalidGroupIdException e) { // OK, expected @@ -1383,7 +1383,7 @@ public void testOperationsByAssigningConsumerWithDefaultGroupId() { consumer.assign(singleton(tp0)); try { - consumer.committed(tp0); + consumer.committed(Collections.singleton(tp0)).get(tp0); fail("Expected an InvalidGroupIdException"); } catch (InvalidGroupIdException e) { // OK, expected @@ -1636,7 +1636,7 @@ public void testCommitSyncAuthenticationFailure() { @Test(expected = AuthenticationException.class) public void testCommittedAuthenticationFaiure() { final KafkaConsumer consumer = consumerWithPendingAuthenticationError(); - consumer.committed(tp0); + consumer.committed(Collections.singleton(tp0)).get(tp0); } @Test diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java index aad4d2973a032..5a012b2cf67a9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java @@ -55,9 +55,10 @@ public void testSimpleMock() { assertEquals(rec1, iter.next()); assertEquals(rec2, iter.next()); assertFalse(iter.hasNext()); - assertEquals(2L, consumer.position(new TopicPartition("test", 0))); + final TopicPartition tp = new TopicPartition("test", 0); + assertEquals(2L, consumer.position(tp)); consumer.commitSync(); - assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset()); + assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset()); } @SuppressWarnings("deprecation") @@ -81,9 +82,10 @@ public void testSimpleMockDeprecated() { assertEquals(rec1, iter.next()); assertEquals(rec2, iter.next()); assertFalse(iter.hasNext()); - assertEquals(2L, consumer.position(new TopicPartition("test", 0))); + final TopicPartition tp = new TopicPartition("test", 0); + assertEquals(2L, consumer.position(tp)); consumer.commitSync(); - assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset()); + assertEquals(2L, consumer.committed(Collections.singleton(tp)).get(tp).offset()); } @Test diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index 209eac0f506be..e355400fccc58 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -108,7 +108,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { if (records.nonEmpty) { consumer.commitSync() - assertEquals(consumer.position(tp), consumer.committed(tp).offset) + assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset) if (consumer.position(tp) == numRecords) { consumer.seekToBeginning(Collections.emptyList()) @@ -153,7 +153,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { } else if (coin == 2) { info("Committing offset.") consumer.commitSync() - assertEquals(consumer.position(tp), consumer.committed(tp).offset) + assertEquals(consumer.position(tp), consumer.committed(Set(tp).asJava).get(tp).offset) } } } @@ -485,7 +485,7 @@ class ConsumerBounceTest extends AbstractConsumerTest with Logging { consumer.poll(time.Duration.ofSeconds(3L)) assertTrue("Assignment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS)) if (committedRecords > 0) - assertEquals(committedRecords, consumer.committed(tp).offset) + assertEquals(committedRecords, consumer.committed(Set(tp).asJava).get(tp).offset) consumer.close() } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index acb0d6b17ba76..62b358e6fa24d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -278,8 +278,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { // now we should see the committed positions from another consumer val anotherConsumer = createConsumer() - assertEquals(300, anotherConsumer.committed(tp).offset) - assertEquals(500, anotherConsumer.committed(tp2).offset) + assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) } @Test @@ -305,8 +305,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { // now we should see the committed positions from another consumer val anotherConsumer = createConsumer() - assertEquals(300, anotherConsumer.committed(tp).offset) - assertEquals(500, anotherConsumer.committed(tp2).offset) + assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) } @Test @@ -480,17 +480,17 @@ class PlaintextConsumerTest extends BaseConsumerTest { // sync commit val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo") consumer.commitSync(Map((tp, syncMetadata)).asJava) - assertEquals(syncMetadata, consumer.committed(tp)) + assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp)) // async commit val asyncMetadata = new OffsetAndMetadata(10, "bar") sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata))) - assertEquals(asyncMetadata, consumer.committed(tp)) + assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp)) // handle null metadata val nullMetadata = new OffsetAndMetadata(5, null) consumer.commitSync(Map(tp -> nullMetadata).asJava) - assertEquals(nullMetadata, consumer.committed(tp)) + assertEquals(nullMetadata, consumer.committed(Set(tp).asJava).get(tp)) } @Test @@ -509,7 +509,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals(None, callback.lastError) assertEquals(count, callback.successCount) - assertEquals(new OffsetAndMetadata(count), consumer.committed(tp)) + assertEquals(new OffsetAndMetadata(count), consumer.committed(Set(tp).asJava).get(tp)) } @Test @@ -623,7 +623,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { sendRecords(producer, numRecords = 5, tp) val consumer = createConsumer() - assertNull(consumer.committed(new TopicPartition(topic, 15))) + assertTrue(consumer.committed(Set(new TopicPartition(topic, 15)).asJava).isEmpty) // position() on a partition that we aren't subscribed to throws an exception intercept[IllegalStateException] { @@ -634,12 +634,12 @@ class PlaintextConsumerTest extends BaseConsumerTest { assertEquals("position() on a partition that we are subscribed to should reset the offset", 0L, consumer.position(tp)) consumer.commitSync() - assertEquals(0L, consumer.committed(tp).offset) + assertEquals(0L, consumer.committed(Set(tp).asJava).get(tp).offset) consumeAndVerifyRecords(consumer = consumer, numRecords = 5, startingOffset = 0) assertEquals("After consuming 5 records, position should be 5", 5L, consumer.position(tp)) consumer.commitSync() - assertEquals("Committed offset should be returned", 5L, consumer.committed(tp).offset) + assertEquals("Committed offset should be returned", 5L, consumer.committed(Set(tp).asJava).get(tp).offset) sendRecords(producer, numRecords = 1, tp) @@ -1024,12 +1024,12 @@ class PlaintextConsumerTest extends BaseConsumerTest { // commit sync and verify onCommit is called val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue testConsumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(2L))).asJava) - assertEquals(2, testConsumer.committed(tp).offset) + assertEquals(2, testConsumer.committed(Set(tp).asJava).get(tp).offset) assertEquals(commitCountBefore + 1, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) // commit async and verify onCommit is called sendAndAwaitAsyncCommit(testConsumer, Some(Map(tp -> new OffsetAndMetadata(5L)))) - assertEquals(5, testConsumer.committed(tp).offset) + assertEquals(5, testConsumer.committed(Set(tp).asJava).get(tp).offset) assertEquals(commitCountBefore + 2, MockConsumerInterceptor.ON_COMMIT_COUNT.intValue) testConsumer.close() @@ -1076,8 +1076,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { rebalanceListener) // after rebalancing, we should have reset to the committed positions - assertEquals(10, testConsumer.committed(tp).offset) - assertEquals(20, testConsumer.committed(tp2).offset) + assertEquals(10, testConsumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(20, testConsumer.committed(Set(tp2).asJava).get(tp2).offset) assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > commitCountBeforeRebalance) // verify commits are intercepted on close @@ -1321,19 +1321,19 @@ class PlaintextConsumerTest extends BaseConsumerTest { val pos1 = consumer.position(tp) val pos2 = consumer.position(tp2) consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, new OffsetAndMetadata(3L))).asJava) - assertEquals(3, consumer.committed(tp).offset) - assertNull(consumer.committed(tp2)) + assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset) + assertNull(consumer.committed(Set(tp2).asJava).get(tp2)) // Positions should not change assertEquals(pos1, consumer.position(tp)) assertEquals(pos2, consumer.position(tp2)) consumer.commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, new OffsetAndMetadata(5L))).asJava) - assertEquals(3, consumer.committed(tp).offset) - assertEquals(5, consumer.committed(tp2).offset) + assertEquals(3, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(5, consumer.committed(Set(tp2).asJava).get(tp2).offset) // Using async should pick up the committed changes after commit completes sendAndAwaitAsyncCommit(consumer, Some(Map(tp2 -> new OffsetAndMetadata(7L)))) - assertEquals(7, consumer.committed(tp2).offset) + assertEquals(7, consumer.committed(Set(tp2).asJava).get(tp2).offset) } @Test @@ -1371,8 +1371,8 @@ class PlaintextConsumerTest extends BaseConsumerTest { awaitAssignment(consumer, newAssignment) // after rebalancing, we should have reset to the committed positions - assertEquals(300, consumer.committed(tp).offset) - assertEquals(500, consumer.committed(tp2).offset) + assertEquals(300, consumer.committed(Set(tp).asJava).get(tp).offset) + assertEquals(500, consumer.committed(Set(tp2).asJava).get(tp2).offset) } @Test @@ -1808,7 +1808,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { } try { - consumer2.committed(tp) + consumer2.committed(Set(tp).asJava) fail("Expected committed offset fetch to fail due to null group id") } catch { case e: InvalidGroupIdException => // OK diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index 254ccadc89096..e1bf3db4ff9ef 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -389,7 +389,7 @@ class TransactionsTest extends KafkaServerTestHarness { val producer2 = transactionalProducers(1) producer2.initTransactions() - assertEquals(offsetAndMetadata, consumer.committed(tp)) + assertEquals(offsetAndMetadata, consumer.committed(Set(tp).asJava).get(tp)) } @Test diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala index fdadec41e3727..3ee83e25eeab9 100644 --- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala @@ -27,7 +27,7 @@ import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.clients.admin.AdminClientConfig import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{PartitionInfo, TopicPartition} import org.apache.kafka.common.errors.WakeupException import org.apache.kafka.common.serialization.StringDeserializer import org.junit.{After, Before} @@ -70,14 +70,10 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { props.put("group.id", group) val consumer = new KafkaConsumer(props, new StringDeserializer, new StringDeserializer) try { - consumer.partitionsFor(topic).asScala.flatMap { partitionInfo => - val tp = new TopicPartition(partitionInfo.topic, partitionInfo.partition) - val committed = consumer.committed(tp) - if (committed == null) - None - else - Some(tp -> committed.offset) - }.toMap + val partitions: Set[TopicPartition] = consumer.partitionsFor(topic) + .asScala.toSet.map {partitionInfo : PartitionInfo => new TopicPartition(partitionInfo.topic, partitionInfo.partition)} + + consumer.committed(partitions.asJava).asScala.mapValues(_.offset()).toMap } finally { consumer.close() } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index d28bd99c55382..c1951c010c53f 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1432,11 +1432,12 @@ object TestUtils extends Logging { offsetsToCommit.toMap } - def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) = { + def resetToCommittedPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { + val committed = consumer.committed(consumer.assignment).asScala.mapValues(_.offset) + consumer.assignment.asScala.foreach { topicPartition => - val offset = consumer.committed(topicPartition) - if (offset != null) - consumer.seek(topicPartition, offset.offset) + if (committed.contains(topicPartition)) + consumer.seek(topicPartition, committed(topicPartition)) else consumer.seekToBeginning(Collections.singletonList(topicPartition)) } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 798f0b089a032..c4a7a5469cd81 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; @@ -35,7 +34,9 @@ import java.io.IOException; import java.util.Collection; import java.util.HashSet; +import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; public abstract class AbstractTask implements Task { @@ -250,16 +251,23 @@ public Collection changelogPartitions() { return stateMgr.changelogPartitions(); } - long committedOffsetForPartition(final TopicPartition partition) { + Map committedOffsetForPartition(final Set partitions) { try { - final OffsetAndMetadata metadata = consumer.committed(partition); - return metadata != null ? metadata.offset() : 0L; + final Map results = consumer.committed(partitions) + .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); + + // those do not have a committed offset would default to 0 + for (final TopicPartition tp : partitions) { + results.putIfAbsent(tp, 0L); + } + + return results; } catch (final AuthorizationException e) { - throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partition), e); + throw new ProcessorStateException(String.format("task [%s] AuthorizationException when initializing offsets for %s", id, partitions), e); } catch (final WakeupException e) { throw e; } catch (final KafkaException e) { - throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", id, partition), e); + throw new ProcessorStateException(String.format("task [%s] Failed to initialize offsets for %s", id, partitions), e); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 2e80c94e5f570..eb69ee8fa4198 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -247,13 +247,12 @@ void updateStandbyStates(final TopicPartition storePartition, standbyRestoredOffsets.put(storePartition, lastOffset + 1); } - void putOffsetLimit(final TopicPartition partition, - final long limit) { - log.trace("Updating store offset limit for partition {} to {}", partition, limit); - offsetLimits.put(partition, limit); + void putOffsetLimit(final Map offsets) { + log.trace("Updating store offset limit with {}", offsets); + offsetLimits.putAll(offsets); } - long offsetLimit(final TopicPartition partition) { + private long offsetLimit(final TopicPartition partition) { final Long limit = offsetLimits.get(partition); return limit != null ? limit : Long.MAX_VALUE; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index c758ccd2e7b23..744e470f8bb9e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -217,14 +217,18 @@ private long updateOffsetLimits(final TopicPartition partition) { throw new IllegalArgumentException("Topic is not both a source and a changelog: " + partition); } - updateableOffsetLimits.remove(partition); + final Map newLimits = committedOffsetForPartition(updateableOffsetLimits); + final Long previousLimit = offsetLimits.get(partition); + final Long newLimit = newLimits.get(partition); - final long newLimit = committedOffsetForPartition(partition); - final long previousLimit = offsetLimits.put(partition, newLimit); - if (previousLimit > newLimit) { + if (previousLimit != null && previousLimit > newLimit) { throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. " + "New limit: " + newLimit + ". Previous limit: " + previousLimit); } + + offsetLimits.putAll(newLimits); + updateableOffsetLimits.clear(); + return newLimit; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 3dd83b5b00cd4..0e39e91247599 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -29,6 +29,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -245,13 +247,10 @@ public boolean initializeStateStores() { // partitions of topics that are both sources and changelogs and set the consumer committed // offset via stateMgr as there is not a more direct route. final Set changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values()); - partitions.stream() - .filter(tp -> changelogTopicNames.contains(tp.topic())) - .forEach(tp -> { - final long offset = committedOffsetForPartition(tp); - stateMgr.putOffsetLimit(tp, offset); - log.trace("Updating store offset limits {} for changelog {}", offset, tp); - }); + final Set sourcePartitionsAsChangelog = new HashSet<>(partitions) + .stream().filter(tp -> changelogTopicNames.contains(tp.topic())).collect(Collectors.toSet()); + final Map committedOffsets = committedOffsetForPartition(sourcePartitionsAsChangelog); + stateMgr.putOffsetLimit(committedOffsets); registerStateStores(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index df889798a86b6..949148667b0fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -562,9 +562,9 @@ public void shouldNotGetConsumerCommittedOffsetIfThereAreNoRecordUpdates() throw final Consumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public synchronized OffsetAndMetadata committed(final TopicPartition partition) { + public synchronized Map committed(final Set partitions) { committedCallCount.getAndIncrement(); - return super.committed(partition); + return super.committed(partitions); } }; @@ -595,9 +595,9 @@ public void shouldGetConsumerCommittedOffsetsOncePerCommit() throws IOException final Consumer consumer = new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public synchronized OffsetAndMetadata committed(final TopicPartition partition) { + public synchronized Map committed(final Set partitions) { committedCallCount.getAndIncrement(); - return super.committed(partition); + return super.committed(partitions); } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index cc0af23eed37d..5de83ac77e3c1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -1508,7 +1508,7 @@ public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() { private Consumer mockConsumerWithCommittedException(final RuntimeException toThrow) { return new MockConsumer(OffsetResetStrategy.EARLIEST) { @Override - public OffsetAndMetadata committed(final TopicPartition partition) { + public Map committed(final Set partitions) { throw toThrow; } }; diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index a0ac1f188f21b..c2332c4272d85 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.sun.scenario.effect.Offset; import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.Namespace; @@ -195,13 +196,13 @@ private static Map consumerPositions(KafkaCon } private static void resetToLastCommittedPositions(KafkaConsumer consumer) { - for (TopicPartition topicPartition : consumer.assignment()) { - OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition); + final Map committed = consumer.committed(consumer.assignment()); + committed.forEach((tp, offsetAndMetadata) -> { if (offsetAndMetadata != null) - consumer.seek(topicPartition, offsetAndMetadata.offset()); + consumer.seek(tp, offsetAndMetadata.offset()); else - consumer.seekToBeginning(singleton(topicPartition)); - } + consumer.seekToBeginning(singleton(tp)); + }); } private static long messagesRemaining(KafkaConsumer consumer, TopicPartition partition) { From b0fe68e67e5bd1aad12a24a5669d3bc60a5be8c0 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sun, 15 Sep 2019 16:25:14 -0700 Subject: [PATCH 3/7] github comments --- .../kafka/clients/consumer/KafkaConsumer.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d54074573bcaf..3cc4b8d1abcd6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1748,7 +1748,8 @@ public OffsetAndMetadata committed(TopicPartition partition) { * Get the last committed offset for the given partition (whether the commit happened by this process or * another). This offset will be used as the position for the consumer in the event of a failure. *

- * This call will block to do a remote call to get the latest committed offsets from the server. + * This call will block until the position can be determined, an unrecoverable error is + * encountered (in which case it is thrown to the caller), or the timeout expires. * * @param partition The partition to check * @param timeout The maximum amount of time to await the current committed offset @@ -1776,17 +1777,18 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time * Get the last committed offsets for the given partitions (whether the commit happened by this process or * another). The returned offsets will be used as the position for the consumer in the event of a failure. *

- * For partitions that do not have a committed offset, they would not be included in the returned map. + * Partitions that do not have a committed offset would not be included in the returned map. *

* If any of the partitions requested do not exist, an exception would be thrown. *

- * This call will do a remote call to get the latest committed offset from the server, and will block until the + * This call will do a remote call to get the latest committed offsets from the server, and will block until the * committed offset is gotten successfully, an unrecoverable error is encountered (in which case it is thrown to * the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). * - * @param partitions The partition to check - * @return The last committed offset and metadata or null if there was no prior commit + * @param partitions The partitions to check + * @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets + * would not be included in the returned result * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while @@ -1807,15 +1809,16 @@ public Map committed(final Set - * For partitions that do not have a committed offset, they would not be included in the returned map. + * Partitions that do not have a committed offset would not be included in the returned map. *

* If any of the partitions requested do not exist, an exception would be thrown. *

* This call will block to do a remote call to get the latest committed offsets from the server. * * @param partitions The partitions to check - * @param timeout The maximum amount of time to await the current committed offset - * @return The last committed offset and metadata or null if there was no prior commit + * @param timeout The maximum amount of time to await the latest committed offsets + * @return The latest committed offsets for the given partitions; partitions that do not have any committed offsets + * would not be included in the returned result * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this * function is called * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while @@ -1835,7 +1838,7 @@ public Map committed(final Set offsets = coordinator.fetchCommittedOffsets(partitions, time.timer(timeout)); if (offsets == null) { throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " + - "committed offset for partition " + partitions + " could be determined. Try tuning default.api.timeout.ms " + + "committed offset for partitions " + partitions + " could be determined. Try tuning default.api.timeout.ms " + "larger to relax the threshold."); } else { offsets.forEach(this::updateLastSeenEpochIfNewer); From 603d7b2877a33d3a7ecfd415277c1272a08afc5c Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Sun, 15 Sep 2019 19:45:38 -0700 Subject: [PATCH 4/7] remove imports --- .../java/org/apache/kafka/tools/TransactionalMessageCopier.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index c2332c4272d85..cfbac1a4e4c49 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -18,7 +18,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.sun.scenario.effect.Offset; import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.Namespace; From 4573451cc136db6af6b6b553574e9e63b4e668e0 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 16 Sep 2019 16:00:25 -0700 Subject: [PATCH 5/7] github comments --- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../processor/internals/AbstractTask.java | 2 +- .../internals/ProcessorStateManager.java | 2 +- .../processor/internals/StandbyTask.java | 18 ++++++++---------- .../processor/internals/StreamTask.java | 4 ++-- 5 files changed, 13 insertions(+), 15 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3cc4b8d1abcd6..7e023415dc3dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1782,7 +1782,7 @@ public OffsetAndMetadata committed(TopicPartition partition, final Duration time * If any of the partitions requested do not exist, an exception would be thrown. *

* This call will do a remote call to get the latest committed offsets from the server, and will block until the - * committed offset is gotten successfully, an unrecoverable error is encountered (in which case it is thrown to + * committed offsets are gotten successfully, an unrecoverable error is encountered (in which case it is thrown to * the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller). * diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index c4a7a5469cd81..03a002127b7f3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -251,7 +251,7 @@ public Collection changelogPartitions() { return stateMgr.changelogPartitions(); } - Map committedOffsetForPartition(final Set partitions) { + Map committedOffsetForPartitions(final Set partitions) { try { final Map results = consumer.committed(partitions) .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().offset())); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index eb69ee8fa4198..9f5f29fcdfe07 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -247,7 +247,7 @@ void updateStandbyStates(final TopicPartition storePartition, standbyRestoredOffsets.put(storePartition, lastOffset + 1); } - void putOffsetLimit(final Map offsets) { + void putOffsetLimits(final Map offsets) { log.trace("Updating store offset limit with {}", offsets); offsetLimits.putAll(offsets); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 744e470f8bb9e..78fc21333cf9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -37,10 +37,10 @@ * A StandbyTask */ public class StandbyTask extends AbstractTask { - private Map checkpointedOffsets = new HashMap<>(); + private boolean updateOffsetLimits; private final Sensor closeTaskSensor; private final Map offsetLimits = new HashMap<>(); - private final Set updateableOffsetLimits = new HashSet<>(); + private Map checkpointedOffsets = new HashMap<>(); /** * Create {@link StandbyTask} with its assigned partitions @@ -69,10 +69,8 @@ public class StandbyTask extends AbstractTask { final Set changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values()); partitions.stream() .filter(tp -> changelogTopicNames.contains(tp.topic())) - .forEach(tp -> { - offsetLimits.put(tp, 0L); - updateableOffsetLimits.add(tp); - }); + .forEach(tp -> offsetLimits.put(tp, 0L)); + updateOffsetLimits = true; } @Override @@ -188,7 +186,7 @@ public List> update(final TopicPartition partitio // Check if we're unable to process records due to an offset limit (e.g. when our // partition is both a source and a changelog). If we're limited then try to refresh // the offset limit if possible. - if (record.offset() >= limit && updateableOffsetLimits.contains(partition)) { + if (record.offset() >= limit && updateOffsetLimits) { limit = updateOffsetLimits(partition); } @@ -217,7 +215,7 @@ private long updateOffsetLimits(final TopicPartition partition) { throw new IllegalArgumentException("Topic is not both a source and a changelog: " + partition); } - final Map newLimits = committedOffsetForPartition(updateableOffsetLimits); + final Map newLimits = committedOffsetForPartitions(offsetLimits.keySet()); final Long previousLimit = offsetLimits.get(partition); final Long newLimit = newLimits.get(partition); @@ -227,12 +225,12 @@ private long updateOffsetLimits(final TopicPartition partition) { } offsetLimits.putAll(newLimits); - updateableOffsetLimits.clear(); + updateOffsetLimits = false; return newLimit; } void allowUpdateOfOffsetLimit() { - updateableOffsetLimits.addAll(offsetLimits.keySet()); + updateOffsetLimits = true; } } \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 0e39e91247599..94e6b66e2d422 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -249,8 +249,8 @@ public boolean initializeStateStores() { final Set changelogTopicNames = new HashSet<>(topology.storeToChangelogTopic().values()); final Set sourcePartitionsAsChangelog = new HashSet<>(partitions) .stream().filter(tp -> changelogTopicNames.contains(tp.topic())).collect(Collectors.toSet()); - final Map committedOffsets = committedOffsetForPartition(sourcePartitionsAsChangelog); - stateMgr.putOffsetLimit(committedOffsets); + final Map committedOffsets = committedOffsetForPartitions(sourcePartitionsAsChangelog); + stateMgr.putOffsetLimits(committedOffsets); registerStateStores(); From aefb5bfce0e4a4c0a74891e267939d23c84200a2 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 16 Sep 2019 16:34:41 -0700 Subject: [PATCH 6/7] minor refactoring --- .../streams/processor/internals/StandbyTask.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 78fc21333cf9f..e429d6c428ef5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -216,18 +216,20 @@ private long updateOffsetLimits(final TopicPartition partition) { } final Map newLimits = committedOffsetForPartitions(offsetLimits.keySet()); - final Long previousLimit = offsetLimits.get(partition); - final Long newLimit = newLimits.get(partition); - if (previousLimit != null && previousLimit > newLimit) { - throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. " + - "New limit: " + newLimit + ". Previous limit: " + previousLimit); + for (final Map.Entry newlimit : newLimits.entrySet()) { + final Long previousLimit = offsetLimits.get(newlimit.getKey()); + if (previousLimit != null && previousLimit > newlimit.getValue()) { + throw new IllegalStateException("Offset limit should monotonically increase, but was reduced. " + + "New limit: " + newlimit.getValue() + ". Previous limit: " + previousLimit); + } + } offsetLimits.putAll(newLimits); updateOffsetLimits = false; - return newLimit; + return offsetLimits.get(partition); } void allowUpdateOfOffsetLimit() { From 2bc8bf0f0703bf557bff6c8ae5c9a347599c8d4f Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 20 Sep 2019 13:03:52 -0700 Subject: [PATCH 7/7] replace deprecated API --- .../kafka/clients/consumer/MockConsumer.java | 6 ++-- .../processor/internals/StreamTask.java | 35 +++++++++++-------- .../processor/internals/StreamTaskTest.java | 2 +- 3 files changed, 25 insertions(+), 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 5d7e829637fb0..b20ee9783fe90 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -307,8 +307,10 @@ public OffsetAndMetadata committed(final TopicPartition partition, final Duratio public synchronized Map committed(final Set partitions) { ensureNotClosed(); - return partitions.stream().collect( - Collectors.toMap(tp -> tp, tp -> subscriptions.isAssigned(tp) ? committed.get(tp) : new OffsetAndMetadata(0))); + return partitions.stream() + .filter(committed::containsKey) + .collect(Collectors.toMap(tp -> tp, tp -> subscriptions.isAssigned(tp) ? + committed.get(tp) : new OffsetAndMetadata(0))); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index c965b83d28157..ccf522813f362 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -734,25 +734,30 @@ public void close(boolean clean, taskClosed = true; } - private void initializeCommittedTimestamp(final TopicPartition partition) { - final OffsetAndMetadata metadata = consumer.committed(partition); - - if (metadata != null) { - final long committedTimestamp = decodeTimestamp(metadata.metadata()); - partitionGroup.setPartitionTime(partition, committedTimestamp); - log.debug("A committed timestamp was detected: setting the partition time of partition {}" - + " to {} in stream task {}", partition, committedTimestamp, this); - } else { - log.debug("No committed timestamp was found in metadata for partition {}", partition); - } - } - /** * Retrieves formerly committed timestamps and updates the local queue's partition time. */ public void initializeTaskTime() { - for (final TopicPartition partition : partitionGroup.partitions()) { - initializeCommittedTimestamp(partition); + final Map committed = consumer.committed(partitionGroup.partitions()); + + for (final Map.Entry entry : committed.entrySet()) { + final TopicPartition partition = entry.getKey(); + final OffsetAndMetadata metadata = entry.getValue(); + + if (metadata != null) { + final long committedTimestamp = decodeTimestamp(metadata.metadata()); + partitionGroup.setPartitionTime(partition, committedTimestamp); + log.debug("A committed timestamp was detected: setting the partition time of partition {}" + + " to {} in stream task {}", partition, committedTimestamp, this); + } else { + log.debug("No committed timestamp was found in metadata for partition {}", partition); + } + } + + final Set nonCommitted = new HashSet<>(partitionGroup.partitions()); + nonCommitted.removeAll(committed.keySet()); + for (final TopicPartition partition : nonCommitted) { + log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", partition); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index aa4ab8d50fd70..a4f529d8795f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -657,7 +657,7 @@ public void shouldRespectCommitNeeded() { public void shouldRestorePartitionTimeAfterRestartWithEosDisabled() { createTaskWithProcessAndCommit(false); - assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(consumer.committed(partition1).metadata())); + assertEquals(DEFAULT_TIMESTAMP, task.decodeTimestamp(consumer.committed(Collections.singleton(partition1)).get(partition1).metadata())); // reset times here by creating a new task task = createStatelessTask(createConfig(false));