diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 5bd6b935b3972..e109f88f2a318 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1611,6 +1611,7 @@ public long position(TopicPartition partition, final Duration timeout) { final long timeoutMs = timeout.toMillis(); acquireAndEnsureOpen(); try { + coordinator.poll(timeout.toMillis()); if (!this.subscriptions.isAssigned(partition)) throw new IllegalStateException("You can only check the position for partitions assigned to this consumer."); Long offset = this.subscriptions.position(partition); diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index 372cc3ffed694..55234b17e5922 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -181,7 +181,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumers += consumer0 var commitCompleted = false - var committedPosition: Long = -1 val listener = new TestConsumerReassignmentListener { override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { @@ -190,8 +189,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { // than session timeout and then try a commit. We should still be in the group, // so the commit should succeed Utils.sleep(1500) - committedPosition = consumer0.position(tp) - consumer0.commitSync(Map(tp -> new OffsetAndMetadata(committedPosition)).asJava) + consumer0.commitSync(Map(tp -> new OffsetAndMetadata(0)).asJava) commitCompleted = true } super.onPartitionsRevoked(partitions) @@ -207,7 +205,6 @@ class PlaintextConsumerTest extends BaseConsumerTest { consumer0.subscribe(List("otherTopic").asJava, listener) consumer0.poll(0) - assertEquals(0, committedPosition) assertTrue(commitCompleted) }