Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class RecordCollectorImpl implements RecordCollector {
private final Sensor droppedRecordsSensor;
private final Map<String, Sensor> producedSensorByTopic = new HashMap<>();

private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);
private final AtomicReference<KafkaException> sendException;

/**
* @throws StreamsException fatal error that should cause the thread to die (from producer.initTxn)
Expand All @@ -88,6 +88,7 @@ public RecordCollectorImpl(final LogContext logContext,
this.log = logContext.logger(getClass());
this.taskId = taskId;
this.streamsProducer = streamsProducer;
this.sendException = streamsProducer.sendException();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is basically the fix.

Notice that now an exception caused by one task can be thrown by a different task. For example:

2024-05-30 10:20:35,916] ERROR [i-0af25f5c2bd9bba31-StreamThread-1] stream-thread [i-0af25f5c2bd9bba31-StreamThread-1] Error flushing caches of dirty task 0_0 (org.apache.kafka.streams.processor.internals.TaskManager)
org.apache.kafka.streams.errors.TaskMigratedException: Error encountered sending record to topic stream-soak-test-network-id-repartition for task 1_1 due to:
org.apache.kafka.common.errors.InvalidProducerEpochException: Producer attempted to produce with an old epoch.
Written offsets would not be recorded and no more records would be sent since the producer is fenced, indicating the task may be migrated out; it means all tasks belonging to this thread should be migrated.
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:305)

Task 0_0 throws error caused by 1_1.

this.productionExceptionHandler = productionExceptionHandler;
this.eosEnabled = streamsProducer.eosEnabled();
this.streamsMetrics = streamsMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;

import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class StreamsProducer {
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
private double oldProducerTotalBlockedTime = 0;
private final AtomicReference<KafkaException> sendException = new AtomicReference<>(null);

public StreamsProducer(final StreamsConfig config,
final String threadId,
Expand Down Expand Up @@ -254,6 +256,10 @@ private void maybeBeginTransaction() {
}
}

AtomicReference<KafkaException> sendException() {
return sendException;
}

Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record,
final Callback callback) {
maybeBeginTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -754,7 +755,7 @@ public void shouldForwardFlushToStreamsProducer() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(false);
doNothing().when(streamsProducer).flush();

when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to add this stub to a couple of tests so that they do not throw a NullPointerException because the mock return null for sendException().

final ProcessorTopology topology = mock(ProcessorTopology.class);
when(topology.sinkTopics()).thenReturn(Collections.emptySet());

Expand All @@ -774,6 +775,7 @@ public void shouldForwardFlushToStreamsProducer() {
public void shouldForwardFlushToStreamsProducerEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
doNothing().when(streamsProducer).flush();
final ProcessorTopology topology = mock(ProcessorTopology.class);

Expand Down Expand Up @@ -802,6 +804,7 @@ public void shouldClearOffsetsOnCloseDirty() {
private void shouldClearOffsetsOnClose(final boolean clean) {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
final long offset = 1234L;
final RecordMetadata metadata = new RecordMetadata(
new TopicPartition(topic, 0),
Expand Down Expand Up @@ -853,7 +856,7 @@ private void shouldClearOffsetsOnClose(final boolean clean) {
public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);

when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
final ProcessorTopology topology = mock(ProcessorTopology.class);

final RecordCollector collector = new RecordCollectorImpl(
Expand All @@ -872,8 +875,8 @@ public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
public void shouldAbortTxOnCloseDirtyIfEosEnabled() {
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
doNothing().when(streamsProducer).abortTransaction();

final ProcessorTopology topology = mock(ProcessorTopology.class);

final RecordCollector collector = new RecordCollectorImpl(
Expand Down Expand Up @@ -1514,6 +1517,64 @@ public void shouldNotCallProductionExceptionHandlerOnClassCastException() {
}
}

@Test
public void shouldNotSendIfSendOfOtherTaskFailedInCallback() {
final TaskId taskId1 = new TaskId(0, 0);
final TaskId taskId2 = new TaskId(0, 1);
final StreamsProducer streamsProducer = mock(StreamsProducer.class);
when(streamsProducer.eosEnabled()).thenReturn(true);
when(streamsProducer.sendException()).thenReturn(new AtomicReference<>(null));
when(streamsProducer.send(any(), any())).thenAnswer(
invocation -> {
final Callback callback = invocation.getArgument(1);
callback.onCompletion(null, new ProducerFencedException("KABOOM!"));
return null;
}
);
final RecordCollector collector1 = new RecordCollectorImpl(
logContext,
taskId1,
streamsProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
collector1.initialize();
final RecordCollector collector2 = new RecordCollectorImpl(
logContext,
taskId2,
streamsProducer,
productionExceptionHandler,
streamsMetrics,
topology
);
collector2.initialize();
collector1.send(
topic,
"key",
"val",
null,
0,
null,
stringSerializer,
stringSerializer,
sinkNodeName,
context
);
assertThrows(StreamsException.class, () -> collector2.send(
topic,
"key",
"val",
null,
1,
null,
stringSerializer,
stringSerializer,
sinkNodeName,
context
));
}

private RecordCollector newRecordCollector(final ProductionExceptionHandler productionExceptionHandler) {
return new RecordCollectorImpl(
logContext,
Expand Down