Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,24 +232,26 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
}

if (!consumedOffsetsAndMetadataPerTask.isEmpty()) {
for (final Task task : additionalTasksForCommitting) {
task.prepareCommit();
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.committableOffsetsAndMetadata();
if (!committableOffsets.isEmpty()) {
consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
try {
for (final Task task : additionalTasksForCommitting) {
task.prepareCommit();
final Map<TopicPartition, OffsetAndMetadata> committableOffsets = task.committableOffsetsAndMetadata();
if (!committableOffsets.isEmpty()) {
consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets);
}
}
}

try {
commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask);

for (final Task task : additionalTasksForCommitting) {
task.postCommit();
}
} catch (final RuntimeException e) {
log.error("Failed to commit tasks that are " +
"prepared to close clean, will close them as dirty instead", e);
log.error("Failed to batch commit tasks, " +
"will close all tasks involved in this commit as dirty by the end", e);
dirtyTasks.addAll(additionalTasksForCommitting);
dirtyTasks.addAll(checkpointPerTask.keySet());

checkpointPerTask.clear();
// Just add first taskId to re-throw by the end.
taskCloseExceptions.put(consumedOffsetsAndMetadataPerTask.keySet().iterator().next(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ public class TaskManagerTest {

@Before
public void setUp() {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
setUpTaskManager(StreamThread.ProcessingMode.AT_LEAST_ONCE);
}

private void setUpTaskManager(final StreamThread.ProcessingMode processingMode) {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
taskManager = new TaskManager(
changeLogReader,
UUID.randomUUID(),
Expand All @@ -165,7 +170,7 @@ public void setUp() {
topologyBuilder,
adminClient,
stateDirectory,
StreamThread.ProcessingMode.AT_LEAST_ONCE
processingMode
);
taskManager.setMainConsumer(consumer);
}
Expand Down Expand Up @@ -1228,31 +1233,64 @@ public Collection<TopicPartition> changelogPartitions() {
verify(activeTaskCreator, changeLogReader);
}

@Test
public void shouldCloseActiveTasksDirtyAndPropagatePrepareCommitException() {
setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);

final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);

final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true) {
@Override
public void prepareCommit() {
throw new RuntimeException("task 0_1 prepare commit boom!");
}
};

task01.setCommittableOffsetsAndMetadata(singletonMap(t1p1, new OffsetAndMetadata(0L, null)));
task01.setCommitNeeded();

final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true);
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null));

task02.setCommittableOffsetsAndMetadata(offsetsT02);
task02.setCommitNeeded();

taskManager.tasks().put(taskId00, task00);
taskManager.tasks().put(taskId01, task01);
taskManager.tasks().put(taskId02, task02);

checkOrder(activeTaskCreator, false);

activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01);
expectLastCall();

activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId02);
expectLastCall();

replay(activeTaskCreator);

final RuntimeException thrown = assertThrows(RuntimeException.class,
() -> taskManager.handleAssignment(mkMap(mkEntry(taskId00, taskId00Partitions),
mkEntry(taskId01, taskId01Partitions)), Collections.emptyMap()));
assertThat(thrown.getCause().getMessage(), is("task 0_1 prepare commit boom!"));

assertThat(task00.state(), is(Task.State.CREATED));
assertThat(task01.state(), is(Task.State.CLOSED));
assertThat(task02.state(), is(Task.State.CLOSED));

verify(activeTaskCreator);
}

@Test
public void shouldCloseActiveTasksDirtyAndPropagateCommitException() {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
taskManager = new TaskManager(
changeLogReader,
UUID.randomUUID(),
"taskManagerTest",
streamsMetrics,
activeTaskCreator,
standbyTaskCreator,
topologyBuilder,
adminClient,
stateDirectory,
StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA
);
taskManager.setMainConsumer(consumer);
setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA);

final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true);

final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true);
task01.setCommittableOffsetsAndMetadata(singletonMap(t1p1, new OffsetAndMetadata(0L, null)));
task01.setCommitNeeded();


final StateMachineTask task02 = new StateMachineTask(taskId02, taskId02Partitions, true);
final Map<TopicPartition, OffsetAndMetadata> offsetsT02 = singletonMap(t1p2, new OffsetAndMetadata(1L, null));

Expand All @@ -1266,6 +1304,7 @@ public void shouldCloseActiveTasksDirtyAndPropagateCommitException() {
expect(activeTaskCreator.streamsProducerForTask(taskId01)).andThrow(new RuntimeException("task 0_1 producer boom!"));

checkOrder(activeTaskCreator, false);

activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01);
expectLastCall();

Expand Down Expand Up @@ -1575,20 +1614,7 @@ private void shouldCommitViaProducerIfEosEnabled(final StreamThread.ProcessingMo
final StreamsProducer producer,
final Map<TopicPartition, OffsetAndMetadata> offsetsT01,
final Map<TopicPartition, OffsetAndMetadata> offsetsT02) {
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", StreamsConfig.METRICS_LATEST);
taskManager = new TaskManager(
changeLogReader,
UUID.randomUUID(),
"taskManagerTest",
streamsMetrics,
activeTaskCreator,
standbyTaskCreator,
topologyBuilder,
adminClient,
stateDirectory,
processingMode
);
taskManager.setMainConsumer(consumer);
setUpTaskManager(processingMode);

final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true);
task01.setCommittableOffsetsAndMetadata(offsetsT01);
Expand Down