Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,7 @@ private boolean populateClientStatesMap(final Map<UUID, ClientState> clientState
}

if (fetchEndOffsetsSuccessful) {
state.computeTaskLags(allTaskEndOffsetSums);
state.computeTaskLags(uuid, allTaskEndOffsetSums);
}
clientStates.put(uuid, state);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,24 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;

import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.streams.processor.internals.Task;
import java.util.UUID;

import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM;

public class ClientState {
private static final Logger LOG = LoggerFactory.getLogger(ClientState.class);

private final Set<TaskId> activeTasks;
private final Set<TaskId> standbyTasks;
private final Set<TaskId> assignedTasks;
Expand Down Expand Up @@ -56,7 +61,7 @@ public ClientState() {
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
capacity);
capacity);
}

private ClientState(final Set<TaskId> activeTasks,
Expand Down Expand Up @@ -183,7 +188,7 @@ public void addPreviousTasksAndOffsetSums(final Map<TaskId, Long> taskOffsetSums
/**
* Compute the lag for each stateful task, including tasks this client did not previously have.
*/
public void computeTaskLags(final Map<TaskId, Long> allTaskEndOffsetSums) {
public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndOffsetSums) {
if (!taskLagTotals.isEmpty()) {
throw new IllegalStateException("Already computed task lags for this client.");
}
Expand All @@ -194,11 +199,13 @@ public void computeTaskLags(final Map<TaskId, Long> allTaskEndOffsetSums) {
final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);

if (endOffsetSum < offsetSum) {
throw new IllegalStateException("Task " + task + " had endOffsetSum=" + endOffsetSum +
" smaller than offsetSum=" + offsetSum);
}

if (offsetSum == Task.LATEST_OFFSET) {
LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" +
offsetSum + " on member " + uuid + ". This probably means the task is corrupted," +
" which in turn indicates that it will need to restore from scratch if it gets assigned." +
" The assignor will de-prioritize returning this task to this member in the hopes that" +
" some other member may be able to re-use its state.");
taskLagTotals.put(task, endOffsetSum);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix, to return an accurate estimate instead of throwing an exception. I felt a warning was appropriate, given that this does indicate task corruption, or some other unexpected situation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we report the lag as the whole log in this case? Even if the log is truncated it is not guaranteed to throw the invalid offset exception and hence task-corruption logic would not necessarily triggered.

Copy link
Copy Markdown
Member

@cadonna cadonna Mar 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know the exact reason, why the offset sum on the client is larger than the end offset sum on the broker? Before we change this invariant we should make sure we understand why it is not satisfied. Is it our code that breaks it or some external influences that we cannot control?

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman Mar 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang if the end offset is less than the checkpointed offset, how is it possible to not throw a TaskCorruptedException? I thought that was thrown after checking this exact condition?
edit: what I mean is, do we think this is a possible state? If so, we should explicitly check for it and throw TaskCorrupted if detected. (If not, it's an illegal state and thus the check here is appropriate)

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman Mar 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, @vvcephei , if a single task is corrupted and we do hit the TaskCorruptedException don't we close, wipe, and recreate stores of all tasks in the thread? Seems we would need to keep track of the thread ownership, and if we detect this condition then set the lag for all tasks owned by that thread to the endOffset
edit: nevermind, I checked the TaskCorruptedExcepion PR, looks like we ended up just setting the invalid offsets to 0 instead of wiping everything. So, disregard this comment 🙂

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, edited my previous comment before refreshing & seeing your response. Makes sense, although I still believe we should be checking for this and throwing TaskCorrupted if we detect this case (in the owning StreamThread, not in the leader during assignment)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's could easily happen that any past, present, or future versioned member reports a task sum that's beyond the end-offset sum of what the leader thinks the task contains.

If the corruption comes from our code, we should throw. The leader fetches the end offset shortly before that code, so they are most probably up-to-date. If the corruption is caused by failure we should handle it gracefully.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think about cases where this could possibly go wrong. We know it must reach or already be in standby/restoring phase. I will use logEndOffset for the actual endOffset of the partition, and endOffset for the offset a standby/restoring task is allowed to restore up to.

Newly-assigned: we read the invalid offset from the checkpoint, and determine the endOffset as either logEndOffset (normal) or min(logEndOffset, LCO) (optimized-source-changelog). In both cases, the endOffset <= logEndOffset so the invalid offset should also satisfy offset > endOffset which we can detect and throw as TaskCorrupted.
Already-assigned: in this case, we must have been the ones who wrote the invalid offset and/or have it in our checkpointableOffsets map. But we still need to fetch the endOffset to know when to stop, so the above applies here as well and we can detect the corruption.

In either case, I'd agree the leader should not throw and should just treat that task's offset sum as 0. But, we should make sure that on the other end we actually do detect this case and handle it the way the assignor expects.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman If the log is truncated and then immediately more records are appended to go beyond the original log-end-offset, in old versions we would not throw the exception.

After some thoughts I think it makes sense to report the lag as the whole log.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the conversation, all. I think where it nets out for me is that we don't really expect this to happen for non-bugged code, but there are a few edge cases where it could. Thus, logging a warning actually seems reasonable.

Since the graceful handling option is still safe in any case, and since there do exist edge cases where this condition doesn't indicate a bug in Streams, we'll just keep the proposed graceful handling of reporting the lag as the whole log. Either the member in question won't get assigned the task (because it's de-prioritized wrt to other members who report valid positions), and it winds up cleaning up its state dir on its own, or it gets assigned the task, detects the corruption on its side, and recovers appropriately.

} else if (offsetSum == Task.LATEST_OFFSET) {
taskLagTotals.put(task, Task.LATEST_OFFSET);
} else if (offsetSum == UNKNOWN_OFFSET_SUM) {
taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
Expand All @@ -212,7 +219,7 @@ public void computeTaskLags(final Map<TaskId, Long> allTaskEndOffsetSums) {
* Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client
* did not have any state for this task on disk.
*
* @return end offset sum - offset sum
* @return end offset sum - offset sum
* Task.LATEST_OFFSET if this was previously an active running task on this client
*/
public long lagFor(final TaskId task) {
Expand Down Expand Up @@ -270,15 +277,15 @@ boolean hasAssignedTask(final TaskId taskId) {
@Override
public String toString() {
return "[activeTasks: (" + activeTasks +
") standbyTasks: (" + standbyTasks +
") assignedTasks: (" + assignedTasks +
") prevActiveTasks: (" + prevActiveTasks +
") prevStandbyTasks: (" + prevStandbyTasks +
") prevAssignedTasks: (" + prevAssignedTasks +
") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() +
") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() +
") capacity: " + capacity +
"]";
") standbyTasks: (" + standbyTasks +
") assignedTasks: (" + assignedTasks +
") prevActiveTasks: (" + prevActiveTasks +
") prevStandbyTasks: (" + prevStandbyTasks +
") prevAssignedTasks: (" + prevAssignedTasks +
") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() +
") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() +
") capacity: " + capacity +
"]";
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,29 +82,27 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted

final CountDownLatch instanceLatch = new CountDownLatch(1);

final String stateDirPathOne = stateDirPath + "/" + appId + "-1/";
final KafkaStreams streamInstanceOne =
buildStreamWithDirtyStateDir(appId, stateDirPathOne, instanceLatch);
try (
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test isn't logically changed, but I noticed a couple of problems with it that I fixed on the side.

final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-1/", instanceLatch);
final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-2/", instanceLatch);
) {
Comment on lines +85 to +88
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this into a try-with-resources so that the instances would get closed even if the assertions fail.


final String stateDirPathTwo = stateDirPath + "/" + appId + "-2/";
final KafkaStreams streamInstanceTwo =
buildStreamWithDirtyStateDir(appId, stateDirPathTwo, instanceLatch);

streamInstanceOne.start();
streamInstanceOne.start();

streamInstanceTwo.start();
streamInstanceTwo.start();

// Wait for the record to be processed
assertTrue(instanceLatch.await(15, TimeUnit.SECONDS));
// Wait for the record to be processed
assertTrue(instanceLatch.await(15, TimeUnit.SECONDS));

waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING),
"Stream instance one should be up and running by now");
waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING),
"Stream instance one should be up and running by now");

streamInstanceOne.close(Duration.ofSeconds(30));
streamInstanceTwo.close(Duration.ofSeconds(30));
waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING),
"Stream instance one should be up and running by now");
waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING),
"Stream instance two should be up and running by now");

streamInstanceOne.close(Duration.ZERO);
streamInstanceTwo.close(Duration.ZERO);
Comment on lines +103 to +104
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close them both asynchronously so we can tear them down in parallel. The try-with-resources will block until they are really closed.

}
}

private KafkaStreams buildStreamWithDirtyStateDir(final String appId,
Expand All @@ -123,14 +121,14 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String appId,
.write(Collections.singletonMap(new TopicPartition("unknown-topic", 0), 5L));

assertTrue(new File(stateDirectory.directoryForTask(taskId),
"rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001").mkdirs());
"rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001").mkdirs());

builder.stream(inputTopic,
Consumed.with(Serdes.Integer(), Serdes.Integer()))
.groupByKey()
.count()
.toStream()
.peek((key, value) -> recordProcessLatch.countDown());
Consumed.with(Serdes.Integer(), Serdes.Integer()))
.groupByKey()
.count()
.toStream()
.peek((key, value) -> recordProcessLatch.countDown());

return new KafkaStreams(builder.build(), props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void shouldComputeTaskLags() {
mkEntry(taskId02, 100L)
);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(allTaskEndOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);

assertThat(client.lagFor(taskId01), equalTo(500L));
assertThat(client.lagFor(taskId02), equalTo(0L));
Expand All @@ -210,7 +210,7 @@ public void shouldReturnEndOffsetSumForLagOfTaskWeDidNotPreviouslyOwn() {
final Map<TaskId, Long> taskOffsetSums = Collections.emptyMap();
final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(allTaskEndOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
assertThat(client.lagFor(taskId01), equalTo(500L));
}

Expand All @@ -219,7 +219,7 @@ public void shouldReturnLatestOffsetForLagOfPreviousActiveRunningTask() {
final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, Task.LATEST_OFFSET);
final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(allTaskEndOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
assertThat(client.lagFor(taskId01), equalTo(Task.LATEST_OFFSET));
}

Expand All @@ -228,32 +228,33 @@ public void shouldReturnUnknownOffsetSumForLagOfTaskWithUnknownOffset() {
final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, UNKNOWN_OFFSET_SUM);
final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(allTaskEndOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
assertThat(client.lagFor(taskId01), equalTo(UNKNOWN_OFFSET_SUM));
}

@Test
public void shouldThrowIllegalStateExceptionIfOffsetSumIsGreaterThanEndOffsetSum() {
public void shouldReturnEndOffsetSumIfOffsetSumIsGreaterThanEndOffsetSum() {
final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, 5L);
final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
assertThrows(IllegalStateException.class, () -> client.computeTaskLags(allTaskEndOffsetSums));
client.computeTaskLags(null, allTaskEndOffsetSums);
assertThat(client.lagFor(taskId01), equalTo(1L));
}

@Test
public void shouldThrowIllegalStateExceptionIfTaskLagsMapIsNotEmpty() {
final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, 5L);
final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L);
client.computeTaskLags(taskOffsetSums);
assertThrows(IllegalStateException.class, () -> client.computeTaskLags(allTaskEndOffsetSums));
client.computeTaskLags(null, taskOffsetSums);
assertThrows(IllegalStateException.class, () -> client.computeTaskLags(null, allTaskEndOffsetSums));
}

@Test
public void shouldThrowIllegalStateExceptionOnLagForUnknownTask() {
final Map<TaskId, Long> taskOffsetSums = Collections.singletonMap(taskId01, 0L);
final Map<TaskId, Long> allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L);
client.addPreviousTasksAndOffsetSums(taskOffsetSums);
client.computeTaskLags(allTaskEndOffsetSums);
client.computeTaskLags(null, allTaskEndOffsetSums);
assertThrows(IllegalStateException.class, () -> client.lagFor(taskId02));
}

Expand Down