diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 620991b491f00..dd884c6be72ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -419,7 +420,7 @@ synchronized void incrementSequenceNumber(TopicPartition topicPartition, int inc if (currentSequenceNumber == null) throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence."); - currentSequenceNumber += increment; + currentSequenceNumber = DefaultRecordBatch.incrementSequence(currentSequenceNumber, increment); nextSequence.put(topicPartition, currentSequenceNumber); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 5156c64b5eb42..6c9735e445315 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -522,7 +522,7 @@ static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[ return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers); } - static int incrementSequence(int baseSequence, int increment) { + public static int incrementSequence(int baseSequence, int increment) { if (baseSequence > Integer.MAX_VALUE - increment) return increment - (Integer.MAX_VALUE - baseSequence) - 1; return baseSequence + increment; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index b8f5caedcb243..8da76fb90fe68 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -576,6 +576,18 @@ public void testDefaultSequenceNumber() { assertEquals((int) transactionManager.sequenceNumber(tp0), 3); } + @Test + public void testSequenceNumberOverflow() { + TransactionManager transactionManager = new TransactionManager(); + assertEquals((int) transactionManager.sequenceNumber(tp0), 0); + transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); + assertEquals((int) transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE); + transactionManager.incrementSequenceNumber(tp0, 100); + assertEquals((int) transactionManager.sequenceNumber(tp0), 99); + transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); + assertEquals((int) transactionManager.sequenceNumber(tp0), 98); + } + @Test public void testProducerIdReset() { TransactionManager transactionManager = new TransactionManager();