From 51da8fa1e09b1dcb135721b821e81e38385ed5bf Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Thu, 6 Jun 2024 10:06:24 +0200 Subject: [PATCH] KAFKA-16903: Consider produce error of different task A task does not know anything about a produce error thrown by a different task. That might lead to a InvalidTxnStateException when a task attempts to do a transactional operation on a producer that failed due to a different task. This commit stores the produce exception in the streams producer on completion of a send instead of the record collector since the record collector is on task level whereas the stream producer is on stream thread level. Since all tasks use the same streams producer the error should be correctly propagated across tasks of the same stream thread. For EOS alpha, this commit does not change anything because each task uses its own producer. The send error is still on task level but so is also the transaction. --- .../internals/RecordCollectorImpl.java | 3 +- .../processor/internals/StreamsProducer.java | 6 ++ .../internals/RecordCollectorTest.java | 67 ++++++++++++++++++- 3 files changed, 72 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 6b559180484aa..0fad890b2f9e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -74,7 +74,7 @@ public class RecordCollectorImpl implements RecordCollector { private final Sensor droppedRecordsSensor; private final Map producedSensorByTopic = new HashMap<>(); - private final AtomicReference sendException = new AtomicReference<>(null); + private final AtomicReference sendException; /** * @throws StreamsException fatal error that should cause the thread to die (from producer.initTxn) @@ -88,6 +88,7 @@ public RecordCollectorImpl(final LogContext logContext, this.log = logContext.logger(getClass()); this.taskId = taskId; this.streamsProducer = streamsProducer; + this.sendException = streamsProducer.sendException(); this.productionExceptionHandler = productionExceptionHandler; this.eosEnabled = streamsProducer.eosEnabled(); this.streamsMetrics = streamsMetrics; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java index 21acb407afafd..40123fed30a33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -78,6 +79,7 @@ public class StreamsProducer { private boolean transactionInFlight = false; private boolean transactionInitialized = false; private double oldProducerTotalBlockedTime = 0; + private final AtomicReference sendException = new AtomicReference<>(null); public StreamsProducer(final StreamsConfig config, final String threadId, @@ -254,6 +256,10 @@ private void maybeBeginTransaction() { } } + AtomicReference sendException() { + return sendException; + } + Future send(final ProducerRecord record, final Callback callback) { maybeBeginTransaction(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 253b44f5d9ed7..d9e4782d73e41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -74,6 +74,7 @@ import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -754,7 +755,7 @@ public void shouldForwardFlushToStreamsProducer() { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(false); doNothing().when(streamsProducer).flush(); - + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); final ProcessorTopology topology = mock(ProcessorTopology.class); when(topology.sinkTopics()).thenReturn(Collections.emptySet()); @@ -774,6 +775,7 @@ public void shouldForwardFlushToStreamsProducer() { public void shouldForwardFlushToStreamsProducerEosEnabled() { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(true); + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); doNothing().when(streamsProducer).flush(); final ProcessorTopology topology = mock(ProcessorTopology.class); @@ -802,6 +804,7 @@ public void shouldClearOffsetsOnCloseDirty() { private void shouldClearOffsetsOnClose(final boolean clean) { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(true); + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); final long offset = 1234L; final RecordMetadata metadata = new RecordMetadata( new TopicPartition(topic, 0), @@ -853,7 +856,7 @@ private void shouldClearOffsetsOnClose(final boolean clean) { public void shouldNotAbortTxOnCloseCleanIfEosEnabled() { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(true); - + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); final ProcessorTopology topology = mock(ProcessorTopology.class); final RecordCollector collector = new RecordCollectorImpl( @@ -872,8 +875,8 @@ public void shouldNotAbortTxOnCloseCleanIfEosEnabled() { public void shouldAbortTxOnCloseDirtyIfEosEnabled() { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(true); + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); doNothing().when(streamsProducer).abortTransaction(); - final ProcessorTopology topology = mock(ProcessorTopology.class); final RecordCollector collector = new RecordCollectorImpl( @@ -1514,6 +1517,64 @@ public void shouldNotCallProductionExceptionHandlerOnClassCastException() { } } + @Test + public void shouldNotSendIfSendOfOtherTaskFailedInCallback() { + final TaskId taskId1 = new TaskId(0, 0); + final TaskId taskId2 = new TaskId(0, 1); + final StreamsProducer streamsProducer = mock(StreamsProducer.class); + when(streamsProducer.eosEnabled()).thenReturn(true); + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); + when(streamsProducer.send(any(), any())).thenAnswer( + invocation -> { + final Callback callback = invocation.getArgument(1); + callback.onCompletion(null, new ProducerFencedException("KABOOM!")); + return null; + } + ); + final RecordCollector collector1 = new RecordCollectorImpl( + logContext, + taskId1, + streamsProducer, + productionExceptionHandler, + streamsMetrics, + topology + ); + collector1.initialize(); + final RecordCollector collector2 = new RecordCollectorImpl( + logContext, + taskId2, + streamsProducer, + productionExceptionHandler, + streamsMetrics, + topology + ); + collector2.initialize(); + collector1.send( + topic, + "key", + "val", + null, + 0, + null, + stringSerializer, + stringSerializer, + sinkNodeName, + context + ); + assertThrows(StreamsException.class, () -> collector2.send( + topic, + "key", + "val", + null, + 1, + null, + stringSerializer, + stringSerializer, + sinkNodeName, + context + )); + } + private RecordCollector newRecordCollector(final ProductionExceptionHandler productionExceptionHandler) { return new RecordCollectorImpl( logContext,