From 80562ef117bd01ce269c609f7ff6a1484792e29a Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 20 Feb 2020 14:39:15 -0800 Subject: [PATCH 1/4] do not abort transaction in unclean close --- .../processor/internals/StreamTask.java | 40 +++++-------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 54da00d251a4f..ddcacd8e84f92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -94,7 +94,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator private long idleStartTime; private Producer producer; private boolean commitRequested = false; - private boolean transactionInFlight = false; private final String threadId; @@ -294,7 +293,6 @@ public void initializeTopology() { } catch (final ProducerFencedException | UnknownProducerIdException e) { throw new TaskMigratedException(this, e); } - transactionInFlight = true; } processorContext.initialize(); @@ -522,10 +520,8 @@ void commit(final boolean startNewTransaction, final Map p if (eosEnabled) { producer.sendOffsetsToTransaction(consumedOffsetsAndMetadata, applicationId); producer.commitTransaction(); - transactionInFlight = false; if (startNewTransaction) { producer.beginTransaction(); - transactionInFlight = true; } } else { consumer.commitSync(consumedOffsetsAndMetadata); @@ -640,14 +636,7 @@ void suspend(final boolean clean, if (eosEnabled) { stateMgr.checkpoint(activeTaskCheckpointableOffsets()); - - try { - recordCollector.close(); - } catch (final RecoverableClientException e) { - taskMigratedException = new TaskMigratedException(this, e); - } finally { - producer = null; - } + taskMigratedException = closeRecordCollector(); } } if (taskMigratedException != null) { @@ -662,37 +651,26 @@ void suspend(final boolean clean, } if (eosEnabled) { - maybeAbortTransactionAndCloseRecordCollector(isZombie); + // Ignore any exceptions whilee closing the record collector, i.e task producer. + closeRecordCollector(); } } } - private void maybeAbortTransactionAndCloseRecordCollector(final boolean isZombie) { - if (!isZombie) { - try { - if (transactionInFlight) { - producer.abortTransaction(); - } - transactionInFlight = false; - } catch (final ProducerFencedException ignore) { - /* TODO - * this should actually never happen atm as we guard the call to #abortTransaction - * -> the reason for the guard is a "bug" in the Producer -- it throws IllegalStateException - * instead of ProducerFencedException atm. We can remove the isZombie flag after KAFKA-5604 got - * fixed and fall-back to this catch-and-swallow code - */ - - // can be ignored: transaction got already aborted by brokers/transactional-coordinator if this happens - } - } + private TaskMigratedException closeRecordCollector() { + TaskMigratedException taskMigratedException = null; try { recordCollector.close(); + } catch (final RecoverableClientException e) { + taskMigratedException = new TaskMigratedException(this, e); } catch (final Throwable e) { log.error("Failed to close producer due to the following error:", e); } finally { producer = null; } + + return taskMigratedException; } private void closeTopology() { From f4f23e527a8e9dc3fcaccc0bb1083edc7cfdaa5f Mon Sep 17 00:00:00 2001 From: abbccdda Date: Thu, 20 Feb 2020 15:18:03 -0800 Subject: [PATCH 2/4] deprecate isZombie --- .../streams/processor/internals/StreamTask.java | 9 ++++----- .../streams/processor/internals/StreamTaskTest.java | 12 ++++++------ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index ddcacd8e84f92..9aa8e793571e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -598,7 +598,7 @@ private void initTopology() { */ public void suspend() { log.debug("Suspending"); - suspend(true, false); + suspend(true); } /** @@ -614,8 +614,7 @@ public void suspend() { * or if the task producer got fenced (EOS) */ // visible for testing - void suspend(final boolean clean, - final boolean isZombie) { + void suspend(final boolean clean) { // this is necessary because all partition times are reset to -1 during close // we need to preserve the original partitions times before calling commit final Map partitionTimes = extractPartitionTimes(); @@ -720,7 +719,7 @@ void closeSuspended(final boolean clean, RuntimeException firstException) { /** *
-     * - {@link #suspend(boolean, boolean) suspend(clean)}
+     * - {@link #suspend(boolean) suspend(clean)}
      *   - close topology
      *   - if (clean) {@link #commit()}
      *     - flush state and producer
@@ -743,7 +742,7 @@ public void close(boolean clean,
 
         RuntimeException firstException = null;
         try {
-            suspend(clean, isZombie);
+            suspend(clean);
         } catch (final RuntimeException e) {
             clean = false;
             firstException = e;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1d0ca4f69d0a6..4f87b39f3a442 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1317,26 +1317,26 @@ public void shouldNotCloseProducerIfFencedOnCloseDuringCleanCloseWithEosEnabled(
     }
 
     @Test
-    public void shouldAbortTransactionAndCloseProducerOnUncleanCloseWithEosEnabled() {
+    public void shouldNotAbortTransactionAndCloseProducerOnUncleanCloseWithEosEnabled() {
         task = createStatelessTask(createConfig(true), StreamsConfig.METRICS_LATEST);
         task.initializeTopology();
 
         task.close(false, false);
         task = null;
 
-        assertTrue(producer.transactionAborted());
-        assertFalse(producer.transactionInFlight());
+        assertFalse(producer.transactionAborted());
+        assertTrue(producer.transactionInFlight());
         assertTrue(producer.closed());
     }
 
     @Test
-    public void shouldAbortTransactionAndCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
+    public void shouldNotAbortTransactionAndCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
         task = createTaskThatThrowsException(true);
         task.initializeTopology();
 
         task.close(false, false);
 
-        assertTrue(producer.transactionAborted());
+        assertFalse(producer.transactionAborted());
         assertTrue(producer.closed());
     }
 
@@ -1561,7 +1561,7 @@ public void shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUnclea
         task.close(false, false);
         task = null;
 
-        assertTrue(producer.transactionAborted());
+        assertFalse(producer.transactionAborted());
         assertFalse(producer.closed());
     }
 

From 4f315cac4a96db56ec72d3d5524a7e9042537234 Mon Sep 17 00:00:00 2001
From: abbccdda 
Date: Thu, 20 Feb 2020 16:42:42 -0800
Subject: [PATCH 3/4] do not test on txn abort

---
 .../streams/processor/internals/StreamTaskTest.java      | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 4f87b39f3a442..2e91d3d589b98 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1317,26 +1317,24 @@ public void shouldNotCloseProducerIfFencedOnCloseDuringCleanCloseWithEosEnabled(
     }
 
     @Test
-    public void shouldNotAbortTransactionAndCloseProducerOnUncleanCloseWithEosEnabled() {
+    public void shouldCloseProducerOnUncleanCloseWithEosEnabled() {
         task = createStatelessTask(createConfig(true), StreamsConfig.METRICS_LATEST);
         task.initializeTopology();
 
         task.close(false, false);
         task = null;
 
-        assertFalse(producer.transactionAborted());
         assertTrue(producer.transactionInFlight());
         assertTrue(producer.closed());
     }
 
     @Test
-    public void shouldNotAbortTransactionAndCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
+    public void shouldCloseProducerOnErrorDuringUncleanCloseWithEosEnabled() {
         task = createTaskThatThrowsException(true);
         task.initializeTopology();
 
         task.close(false, false);
 
-        assertFalse(producer.transactionAborted());
         assertTrue(producer.closed());
     }
 
@@ -1553,7 +1551,7 @@ public void shouldOnlyCloseFencedProducerOnUncleanClosedWithEosEnabled() {
     }
 
     @Test
-    public void shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
+    public void shouldNotCloseProducerIfFencedOnCloseDuringUncleanCloseWithEosEnabled() {
         task = createStatelessTask(createConfig(true), StreamsConfig.METRICS_LATEST);
         task.initializeTopology();
         producer.fenceProducerOnClose();
@@ -1561,7 +1559,6 @@ public void shouldAbortTransactionButNotCloseProducerIfFencedOnCloseDuringUnclea
         task.close(false, false);
         task = null;
 
-        assertFalse(producer.transactionAborted());
         assertFalse(producer.closed());
     }
 

From 79860652913ae6f48141dbea92fefb06f65292ea Mon Sep 17 00:00:00 2001
From: abbccdda 
Date: Thu, 20 Feb 2020 19:02:30 -0800
Subject: [PATCH 4/4] add comment

---
 .../apache/kafka/streams/processor/internals/StreamTaskTest.java | 1 +
 1 file changed, 1 insertion(+)

diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 2e91d3d589b98..2832291951f94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1324,6 +1324,7 @@ public void shouldCloseProducerOnUncleanCloseWithEosEnabled() {
         task.close(false, false);
         task = null;
 
+        // Make sure no method call on the producer during an unclean close (such as abort).
         assertTrue(producer.transactionInFlight());
         assertTrue(producer.closed());
     }