diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 6b559180484aa..0fad890b2f9e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -74,7 +74,7 @@ public class RecordCollectorImpl implements RecordCollector { private final Sensor droppedRecordsSensor; private final Map producedSensorByTopic = new HashMap<>(); - private final AtomicReference sendException = new AtomicReference<>(null); + private final AtomicReference sendException; /** * @throws StreamsException fatal error that should cause the thread to die (from producer.initTxn) @@ -88,6 +88,7 @@ public RecordCollectorImpl(final LogContext logContext, this.log = logContext.logger(getClass()); this.taskId = taskId; this.streamsProducer = streamsProducer; + this.sendException = streamsProducer.sendException(); this.productionExceptionHandler = productionExceptionHandler; this.eosEnabled = streamsProducer.eosEnabled(); this.streamsMetrics = streamsMetrics; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java index 21acb407afafd..40123fed30a33 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; @@ -78,6 +79,7 @@ public class StreamsProducer { private boolean transactionInFlight = false; private boolean transactionInitialized = false; private double oldProducerTotalBlockedTime = 0; + private final AtomicReference sendException = new AtomicReference<>(null); public StreamsProducer(final StreamsConfig config, final String threadId, @@ -254,6 +256,10 @@ private void maybeBeginTransaction() { } } + AtomicReference sendException() { + return sendException; + } + Future send(final ProducerRecord record, final Callback callback) { maybeBeginTransaction(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 253b44f5d9ed7..d9e4782d73e41 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -74,6 +74,7 @@ import java.util.UUID; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -754,7 +755,7 @@ public void shouldForwardFlushToStreamsProducer() { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(false); doNothing().when(streamsProducer).flush(); - + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); final ProcessorTopology topology = mock(ProcessorTopology.class); when(topology.sinkTopics()).thenReturn(Collections.emptySet()); @@ -774,6 +775,7 @@ public void shouldForwardFlushToStreamsProducer() { public void shouldForwardFlushToStreamsProducerEosEnabled() { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(true); + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); doNothing().when(streamsProducer).flush(); final ProcessorTopology topology = mock(ProcessorTopology.class); @@ -802,6 +804,7 @@ public void shouldClearOffsetsOnCloseDirty() { private void shouldClearOffsetsOnClose(final boolean clean) { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(true); + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); final long offset = 1234L; final RecordMetadata metadata = new RecordMetadata( new TopicPartition(topic, 0), @@ -853,7 +856,7 @@ private void shouldClearOffsetsOnClose(final boolean clean) { public void shouldNotAbortTxOnCloseCleanIfEosEnabled() { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(true); - + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); final ProcessorTopology topology = mock(ProcessorTopology.class); final RecordCollector collector = new RecordCollectorImpl( @@ -872,8 +875,8 @@ public void shouldNotAbortTxOnCloseCleanIfEosEnabled() { public void shouldAbortTxOnCloseDirtyIfEosEnabled() { final StreamsProducer streamsProducer = mock(StreamsProducer.class); when(streamsProducer.eosEnabled()).thenReturn(true); + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); doNothing().when(streamsProducer).abortTransaction(); - final ProcessorTopology topology = mock(ProcessorTopology.class); final RecordCollector collector = new RecordCollectorImpl( @@ -1514,6 +1517,64 @@ public void shouldNotCallProductionExceptionHandlerOnClassCastException() { } } + @Test + public void shouldNotSendIfSendOfOtherTaskFailedInCallback() { + final TaskId taskId1 = new TaskId(0, 0); + final TaskId taskId2 = new TaskId(0, 1); + final StreamsProducer streamsProducer = mock(StreamsProducer.class); + when(streamsProducer.eosEnabled()).thenReturn(true); + when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null)); + when(streamsProducer.send(any(), any())).thenAnswer( + invocation -> { + final Callback callback = invocation.getArgument(1); + callback.onCompletion(null, new ProducerFencedException("KABOOM!")); + return null; + } + ); + final RecordCollector collector1 = new RecordCollectorImpl( + logContext, + taskId1, + streamsProducer, + productionExceptionHandler, + streamsMetrics, + topology + ); + collector1.initialize(); + final RecordCollector collector2 = new RecordCollectorImpl( + logContext, + taskId2, + streamsProducer, + productionExceptionHandler, + streamsMetrics, + topology + ); + collector2.initialize(); + collector1.send( + topic, + "key", + "val", + null, + 0, + null, + stringSerializer, + stringSerializer, + sinkNodeName, + context + ); + assertThrows(StreamsException.class, () -> collector2.send( + topic, + "key", + "val", + null, + 1, + null, + stringSerializer, + stringSerializer, + sinkNodeName, + context + )); + } + private RecordCollector newRecordCollector(final ProductionExceptionHandler productionExceptionHandler) { return new RecordCollectorImpl( logContext,