diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2d1d3de8c0899..edfbe373bb38e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -825,7 +825,7 @@ private boolean populateClientStatesMap(final Map clientState } if (fetchEndOffsetsSuccessful) { - state.computeTaskLags(allTaskEndOffsetSums); + state.computeTaskLags(uuid, allTaskEndOffsetSums); } clientStates.put(uuid, state); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 9a522edddabfa..15d00c0bff4ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -16,19 +16,24 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; - import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.kafka.streams.processor.internals.Task; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; public class ClientState { + private static final Logger LOG = LoggerFactory.getLogger(ClientState.class); + private final Set activeTasks; private final Set standbyTasks; private final Set assignedTasks; @@ -56,7 +61,7 @@ public ClientState() { new HashMap<>(), new HashMap<>(), new HashMap<>(), - capacity); + capacity); } private ClientState(final Set activeTasks, @@ -183,7 +188,7 @@ public void addPreviousTasksAndOffsetSums(final Map taskOffsetSums /** * Compute the lag for each stateful task, including tasks this client did not previously have. */ - public void computeTaskLags(final Map allTaskEndOffsetSums) { + public void computeTaskLags(final UUID uuid, final Map allTaskEndOffsetSums) { if (!taskLagTotals.isEmpty()) { throw new IllegalStateException("Already computed task lags for this client."); } @@ -194,11 +199,13 @@ public void computeTaskLags(final Map allTaskEndOffsetSums) { final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L); if (endOffsetSum < offsetSum) { - throw new IllegalStateException("Task " + task + " had endOffsetSum=" + endOffsetSum + - " smaller than offsetSum=" + offsetSum); - } - - if (offsetSum == Task.LATEST_OFFSET) { + LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" + + offsetSum + " on member " + uuid + ". This probably means the task is corrupted," + + " which in turn indicates that it will need to restore from scratch if it gets assigned." + + " The assignor will de-prioritize returning this task to this member in the hopes that" + + " some other member may be able to re-use its state."); + taskLagTotals.put(task, endOffsetSum); + } else if (offsetSum == Task.LATEST_OFFSET) { taskLagTotals.put(task, Task.LATEST_OFFSET); } else if (offsetSum == UNKNOWN_OFFSET_SUM) { taskLagTotals.put(task, UNKNOWN_OFFSET_SUM); @@ -212,7 +219,7 @@ public void computeTaskLags(final Map allTaskEndOffsetSums) { * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client * did not have any state for this task on disk. * - * @return end offset sum - offset sum + * @return end offset sum - offset sum * Task.LATEST_OFFSET if this was previously an active running task on this client */ public long lagFor(final TaskId task) { @@ -270,15 +277,15 @@ boolean hasAssignedTask(final TaskId taskId) { @Override public String toString() { return "[activeTasks: (" + activeTasks + - ") standbyTasks: (" + standbyTasks + - ") assignedTasks: (" + assignedTasks + - ") prevActiveTasks: (" + prevActiveTasks + - ") prevStandbyTasks: (" + prevStandbyTasks + - ") prevAssignedTasks: (" + prevAssignedTasks + - ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() + - ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() + - ") capacity: " + capacity + - "]"; + ") standbyTasks: (" + standbyTasks + + ") assignedTasks: (" + assignedTasks + + ") prevActiveTasks: (" + prevActiveTasks + + ") prevStandbyTasks: (" + prevStandbyTasks + + ") prevAssignedTasks: (" + prevAssignedTasks + + ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() + + ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() + + ") capacity: " + capacity + + "]"; } // Visible for testing diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index bc22b5c7a5cc1..b0840188bbd8a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -82,29 +82,27 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted final CountDownLatch instanceLatch = new CountDownLatch(1); - final String stateDirPathOne = stateDirPath + "/" + appId + "-1/"; - final KafkaStreams streamInstanceOne = - buildStreamWithDirtyStateDir(appId, stateDirPathOne, instanceLatch); + try ( + final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-1/", instanceLatch); + final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-2/", instanceLatch); + ) { - final String stateDirPathTwo = stateDirPath + "/" + appId + "-2/"; - final KafkaStreams streamInstanceTwo = - buildStreamWithDirtyStateDir(appId, stateDirPathTwo, instanceLatch); - streamInstanceOne.start(); + streamInstanceOne.start(); - streamInstanceTwo.start(); + streamInstanceTwo.start(); - // Wait for the record to be processed - assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); + // Wait for the record to be processed + assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); - waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING), - "Stream instance one should be up and running by now"); - waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING), - "Stream instance one should be up and running by now"); - - streamInstanceOne.close(Duration.ofSeconds(30)); - streamInstanceTwo.close(Duration.ofSeconds(30)); + waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING), + "Stream instance one should be up and running by now"); + waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING), + "Stream instance two should be up and running by now"); + streamInstanceOne.close(Duration.ZERO); + streamInstanceTwo.close(Duration.ZERO); + } } private KafkaStreams buildStreamWithDirtyStateDir(final String appId, @@ -123,14 +121,14 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String appId, .write(Collections.singletonMap(new TopicPartition("unknown-topic", 0), 5L)); assertTrue(new File(stateDirectory.directoryForTask(taskId), - "rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001").mkdirs()); + "rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001").mkdirs()); builder.stream(inputTopic, - Consumed.with(Serdes.Integer(), Serdes.Integer())) - .groupByKey() - .count() - .toStream() - .peek((key, value) -> recordProcessLatch.countDown()); + Consumed.with(Serdes.Integer(), Serdes.Integer())) + .groupByKey() + .count() + .toStream() + .peek((key, value) -> recordProcessLatch.countDown()); return new KafkaStreams(builder.build(), props); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index f08ae1ae28e95..51b28db9e1fc5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -199,7 +199,7 @@ public void shouldComputeTaskLags() { mkEntry(taskId02, 100L) ); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThat(client.lagFor(taskId01), equalTo(500L)); assertThat(client.lagFor(taskId02), equalTo(0L)); @@ -210,7 +210,7 @@ public void shouldReturnEndOffsetSumForLagOfTaskWeDidNotPreviouslyOwn() { final Map taskOffsetSums = Collections.emptyMap(); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThat(client.lagFor(taskId01), equalTo(500L)); } @@ -219,7 +219,7 @@ public void shouldReturnLatestOffsetForLagOfPreviousActiveRunningTask() { final Map taskOffsetSums = Collections.singletonMap(taskId01, Task.LATEST_OFFSET); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThat(client.lagFor(taskId01), equalTo(Task.LATEST_OFFSET)); } @@ -228,24 +228,25 @@ public void shouldReturnUnknownOffsetSumForLagOfTaskWithUnknownOffset() { final Map taskOffsetSums = Collections.singletonMap(taskId01, UNKNOWN_OFFSET_SUM); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThat(client.lagFor(taskId01), equalTo(UNKNOWN_OFFSET_SUM)); } @Test - public void shouldThrowIllegalStateExceptionIfOffsetSumIsGreaterThanEndOffsetSum() { + public void shouldReturnEndOffsetSumIfOffsetSumIsGreaterThanEndOffsetSum() { final Map taskOffsetSums = Collections.singletonMap(taskId01, 5L); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - assertThrows(IllegalStateException.class, () -> client.computeTaskLags(allTaskEndOffsetSums)); + client.computeTaskLags(null, allTaskEndOffsetSums); + assertThat(client.lagFor(taskId01), equalTo(1L)); } @Test public void shouldThrowIllegalStateExceptionIfTaskLagsMapIsNotEmpty() { final Map taskOffsetSums = Collections.singletonMap(taskId01, 5L); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L); - client.computeTaskLags(taskOffsetSums); - assertThrows(IllegalStateException.class, () -> client.computeTaskLags(allTaskEndOffsetSums)); + client.computeTaskLags(null, taskOffsetSums); + assertThrows(IllegalStateException.class, () -> client.computeTaskLags(null, allTaskEndOffsetSums)); } @Test @@ -253,7 +254,7 @@ public void shouldThrowIllegalStateExceptionOnLagForUnknownTask() { final Map taskOffsetSums = Collections.singletonMap(taskId01, 0L); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThrows(IllegalStateException.class, () -> client.lagFor(taskId02)); }