From 88a336156987c7d04e28c4cd186f6b7eb2687352 Mon Sep 17 00:00:00 2001 From: Ming Liu Date: Sun, 2 Dec 2018 21:23:22 -0800 Subject: [PATCH 1/4] KAFKA-7693: Fix SequenceNumber overflow in client TransactionManager. --- .../kafka/clients/producer/internals/TransactionManager.java | 3 ++- .../org/apache/kafka/common/record/DefaultRecordBatch.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 620991b491f00..dd884c6be72ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -419,7 +420,7 @@ synchronized void incrementSequenceNumber(TopicPartition topicPartition, int inc if (currentSequenceNumber == null) throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence."); - currentSequenceNumber += increment; + currentSequenceNumber = DefaultRecordBatch.incrementSequence(currentSequenceNumber, increment); nextSequence.put(topicPartition, currentSequenceNumber); } diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 71e668e45da01..4ec2b1b5cfe20 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -523,7 +523,7 @@ static int estimateBatchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[ return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers); } - static int incrementSequence(int baseSequence, int increment) { + public static int incrementSequence(int baseSequence, int increment) { if (baseSequence > Integer.MAX_VALUE - increment) return increment - (Integer.MAX_VALUE - baseSequence) - 1; return baseSequence + increment; From 1334ae5c6aa8aeac801ee984069dab7de6fe07fe Mon Sep 17 00:00:00 2001 From: Ming Liu Date: Fri, 11 Jan 2019 10:56:09 -0800 Subject: [PATCH 2/4] Add unittest for client transactionManager test. --- .../producer/internals/TransactionManagerTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 72c0a0b55a0d3..f3257f296d34a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -577,6 +577,18 @@ public void testDefaultSequenceNumber() { assertEquals((int) transactionManager.sequenceNumber(tp0), 3); } + @Test + public void testSequenceNumberOverflow() { + TransactionManager transactionManager = new TransactionManager(); + assertEquals((int) transactionManager.sequenceNumber(tp0), 0); + transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); + assertEquals((int) transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE); + transactionManager.incrementSequenceNumber(tp0, 100); + assertEquals((int) transactionManager.sequenceNumber(tp0), 99); + transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); + assertEquals((int) transactionManager.sequenceNumber(tp0), 98); + } + @Test public void testProducerIdReset() { TransactionManager transactionManager = new TransactionManager(); From df71d4d88472866530f6ef8f97d22242291a69dc Mon Sep 17 00:00:00 2001 From: Ming Liu Date: Sun, 13 Jan 2019 19:45:16 -0800 Subject: [PATCH 3/4] Move into one test function. --- .../producer/internals/TransactionManagerTest.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index f3257f296d34a..3b857c1daa19b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -575,18 +575,12 @@ public void testDefaultSequenceNumber() { assertEquals((int) transactionManager.sequenceNumber(tp0), 0); transactionManager.incrementSequenceNumber(tp0, 3); assertEquals((int) transactionManager.sequenceNumber(tp0), 3); - } - @Test - public void testSequenceNumberOverflow() { - TransactionManager transactionManager = new TransactionManager(); - assertEquals((int) transactionManager.sequenceNumber(tp0), 0); + // test overflow transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); - assertEquals((int) transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE); - transactionManager.incrementSequenceNumber(tp0, 100); - assertEquals((int) transactionManager.sequenceNumber(tp0), 99); + assertEquals((int) transactionManager.sequenceNumber(tp0), 2); transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); - assertEquals((int) transactionManager.sequenceNumber(tp0), 98); + assertEquals((int) transactionManager.sequenceNumber(tp0), 1); } @Test From 659350c5cbb070d5834eeaed0fc50dfc0a1011d1 Mon Sep 17 00:00:00 2001 From: Ming Liu Date: Mon, 14 Jan 2019 22:29:47 -0800 Subject: [PATCH 4/4] Add a separate unitest. --- .../producer/internals/TransactionManagerTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 6247eb7045dd4..8da76fb90fe68 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -574,12 +574,18 @@ public void testDefaultSequenceNumber() { assertEquals((int) transactionManager.sequenceNumber(tp0), 0); transactionManager.incrementSequenceNumber(tp0, 3); assertEquals((int) transactionManager.sequenceNumber(tp0), 3); + } - // test overflow + @Test + public void testSequenceNumberOverflow() { + TransactionManager transactionManager = new TransactionManager(); + assertEquals((int) transactionManager.sequenceNumber(tp0), 0); transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); - assertEquals((int) transactionManager.sequenceNumber(tp0), 2); + assertEquals((int) transactionManager.sequenceNumber(tp0), Integer.MAX_VALUE); + transactionManager.incrementSequenceNumber(tp0, 100); + assertEquals((int) transactionManager.sequenceNumber(tp0), 99); transactionManager.incrementSequenceNumber(tp0, Integer.MAX_VALUE); - assertEquals((int) transactionManager.sequenceNumber(tp0), 1); + assertEquals((int) transactionManager.sequenceNumber(tp0), 98); } @Test