diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index a0a238c87d9ae..22222cdfca089 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -232,24 +232,26 @@ public void handleAssignment(final Map> activeTasks, } if (!consumedOffsetsAndMetadataPerTask.isEmpty()) { - for (final Task task : additionalTasksForCommitting) { - task.prepareCommit(); - final Map committableOffsets = task.committableOffsetsAndMetadata(); - if (!committableOffsets.isEmpty()) { - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + try { + for (final Task task : additionalTasksForCommitting) { + task.prepareCommit(); + final Map committableOffsets = task.committableOffsetsAndMetadata(); + if (!committableOffsets.isEmpty()) { + consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + } } - } - try { commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); for (final Task task : additionalTasksForCommitting) { task.postCommit(); } } catch (final RuntimeException e) { - log.error("Failed to commit tasks that are " + - "prepared to close clean, will close them as dirty instead", e); + log.error("Failed to batch commit tasks, " + + "will close all tasks involved in this commit as dirty by the end", e); + dirtyTasks.addAll(additionalTasksForCommitting); dirtyTasks.addAll(checkpointPerTask.keySet()); + checkpointPerTask.clear(); // Just add first taskId to re-throw by the end. taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(), e); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 67b2ec8ebff58..d4769d4dc5ac1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -154,7 +154,12 @@ public class TaskManagerTest { @Before public void setUp() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST); + setUpTaskManager(StreamThread.ProcessingMode.AT_LEAST_ONCE); + } + + private void setUpTaskManager(final StreamThread.ProcessingMode processingMode) { + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST); taskManager = new TaskManager( changeLogReader, UUID.randomUUID(), @@ -165,7 +170,7 @@ public void setUp() { topologyBuilder, adminClient, stateDirectory, - StreamThread.ProcessingMode.AT_LEAST_ONCE + processingMode ); taskManager.setMainConsumer(consumer); } @@ -1228,23 +1233,57 @@ public Collection changelogPartitions() { verify(activeTaskCreator, changeLogReader); } + @Test + public void shouldCloseActiveTasksDirtyAndPropagatePrepareCommitException() { + setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA); + + final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); + + final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true) { + @Override + public void prepareCommit() { + throw new RuntimeException("task 0_1 prepare commit boom!"); + } + }; + + task01.setCommittableOffsetsAndMetadata(singletonMap(t1p1, new OffsetAndMetadata(0L, null))); + task01.setCommitNeeded(); + + final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true); + final Map offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null)); + + task02.setCommittableOffsetsAndMetadata(offsetsT02); + task02.setCommitNeeded(); + + taskManager.tasks().put(taskId00, task00); + taskManager.tasks().put(taskId01, task01); + taskManager.tasks().put(taskId02, task02); + + checkOrder(activeTaskCreator, false); + + activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01); + expectLastCall(); + + activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId02); + expectLastCall(); + + replay(activeTaskCreator); + + final RuntimeException thrown = assertThrows(RuntimeException.class, + () -> taskManager.handleAssignment(mkMap(mkEntry(taskId00, taskId00Partitions), + mkEntry(taskId01, taskId01Partitions)), Collections.emptyMap())); + assertThat(thrown.getCause().getMessage(), is("task 0_1 prepare commit boom!")); + + assertThat(task00.state(), is(Task.State.CREATED)); + assertThat(task01.state(), is(Task.State.CLOSED)); + assertThat(task02.state(), is(Task.State.CLOSED)); + + verify(activeTaskCreator); + } + @Test public void shouldCloseActiveTasksDirtyAndPropagateCommitException() { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( - new Metrics(), "clientId", StreamsConfig.METRICS_LATEST); - taskManager = new TaskManager( - changeLogReader, - UUID.randomUUID(), - "taskManagerTest", - streamsMetrics, - activeTaskCreator, - standbyTaskCreator, - topologyBuilder, - adminClient, - stateDirectory, - StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA - ); - taskManager.setMainConsumer(consumer); + setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA); final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); @@ -1252,7 +1291,6 @@ public void shouldCloseActiveTasksDirtyAndPropagateCommitException() { task01.setCommittableOffsetsAndMetadata(singletonMap(t1p1, new OffsetAndMetadata(0L, null))); task01.setCommitNeeded(); - final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true); final Map offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null)); @@ -1266,6 +1304,7 @@ public void shouldCloseActiveTasksDirtyAndPropagateCommitException() { expect(activeTaskCreator.streamsProducerForTask(taskId01)).andThrow(new RuntimeException("task 0_1 producer boom!")); checkOrder(activeTaskCreator, false); + activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01); expectLastCall(); @@ -1575,20 +1614,7 @@ private void shouldCommitViaProducerIfEosEnabled(final StreamThread.ProcessingMo final StreamsProducer producer, final Map offsetsT01, final Map offsetsT02) { - final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST); - taskManager = new TaskManager( - changeLogReader, - UUID.randomUUID(), - "taskManagerTest", - streamsMetrics, - activeTaskCreator, - standbyTaskCreator, - topologyBuilder, - adminClient, - stateDirectory, - processingMode - ); - taskManager.setMainConsumer(consumer); + setUpTaskManager(processingMode); final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true); task01.setCommittableOffsetsAndMetadata(offsetsT01);