Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ void runLoop() {
taskManager.tasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING)
.filter(t -> !e.corruptedTaskWithChangelogs().containsKey(t.id()))
.collect(Collectors.toSet())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1920,7 +1920,9 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
mkEntry(taskId1, emptySet())
);

expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task1.id()).andReturn(taskId1).anyTimes();
expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task2.id()).andReturn(taskId2).anyTimes();

expect(taskManager.tasks()).andReturn(mkMap(
Expand Down Expand Up @@ -1961,6 +1963,63 @@ void runOnce() {
verify(taskManager);
}

@Test
public void shouldNotCommitNonRunningNonCorruptedTasksOnTaskCorruptedException() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
final Task task1 = mock(Task.class);
final Task task2 = mock(Task.class);
final TaskId taskId1 = new TaskId(0, 0);
final TaskId taskId2 = new TaskId(0, 2);

final Map<TaskId, Collection<TopicPartition>> corruptedTasksWithChangelogs = mkMap(
mkEntry(taskId1, emptySet())
);

expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
expect(task1.id()).andReturn(taskId1).anyTimes();
expect(task2.state()).andReturn(Task.State.CREATED).anyTimes();
expect(task2.id()).andReturn(taskId2).anyTimes();

expect(taskManager.tasks()).andReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2)
)).anyTimes();
// expect not to try and commit task2, even though it's not corrupted, because it's not running.
expect(taskManager.commit(emptySet())).andReturn(0);

EasyMock.replay(task1, task2, taskManager);

final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
final StreamThread thread = new StreamThread(
mockTime,
config,
null,
consumer,
consumer,
null,
null,
taskManager,
streamsMetrics,
internalTopologyBuilder,
CLIENT_ID,
new LogContext(""),
new AtomicInteger(),
new AtomicLong(Long.MAX_VALUE)
) {
@Override
void runOnce() {
setState(State.PENDING_SHUTDOWN);
throw new TaskCorruptedException(corruptedTasksWithChangelogs);
}
}.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));

thread.setState(StreamThread.State.STARTING);
thread.runLoop();

verify(taskManager);
}

@Test
public void shouldLogAndRecordSkippedRecordsForInvalidTimestampsWithBuiltInMetricsVersion0100To24() {
shouldLogAndRecordSkippedRecordsForInvalidTimestamps(StreamsConfig.METRICS_0100_TO_24);
Expand Down