From 269bcf66ac9fe1a404d5d5571dfd3cbcd0b2707c Mon Sep 17 00:00:00 2001 From: John Roesler Date: Sun, 22 Mar 2020 18:03:01 -0500 Subject: [PATCH 1/2] KAFKA-9742: Fix broken StandbyTaskEOSIntegrationTest --- .../internals/assignment/ClientState.java | 16 ++++--- .../StandbyTaskEOSIntegrationTest.java | 44 +++++++++---------- .../internals/assignment/ClientStateTest.java | 5 ++- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 9a522edddabfa..3f5fbe256dad4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -26,9 +26,14 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; + import org.apache.kafka.streams.processor.internals.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ClientState { + private static final Logger LOG = LoggerFactory.getLogger(ClientState.class); + private final Set activeTasks; private final Set standbyTasks; private final Set assignedTasks; @@ -194,11 +199,12 @@ public void computeTaskLags(final Map allTaskEndOffsetSums) { final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L); if (endOffsetSum < offsetSum) { - throw new IllegalStateException("Task " + task + " had endOffsetSum=" + endOffsetSum + - " smaller than offsetSum=" + offsetSum); - } - - if (offsetSum == Task.LATEST_OFFSET) { + LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + + " smaller than offsetSum=" + offsetSum + ". This probably means the task is corrupted," + + " which in turn indicates that it will need to restore from scratch, so we pin the lag" + + " to the end offset of the log."); + taskLagTotals.put(task, endOffsetSum); + } else if (offsetSum == Task.LATEST_OFFSET) { taskLagTotals.put(task, Task.LATEST_OFFSET); } else if (offsetSum == UNKNOWN_OFFSET_SUM) { taskLagTotals.put(task, UNKNOWN_OFFSET_SUM); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index bc22b5c7a5cc1..bdb842aa365b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -82,29 +82,27 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted final CountDownLatch instanceLatch = new CountDownLatch(1); - final String stateDirPathOne = stateDirPath + "/" + appId + "-1/"; - final KafkaStreams streamInstanceOne = - buildStreamWithDirtyStateDir(appId, stateDirPathOne, instanceLatch); + try ( + final KafkaStreams streamInstanceOne = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-1/", instanceLatch); + final KafkaStreams streamInstanceTwo = buildStreamWithDirtyStateDir(appId, stateDirPath + "/" + appId + "-2/", instanceLatch); + ) { - final String stateDirPathTwo = stateDirPath + "/" + appId + "-2/"; - final KafkaStreams streamInstanceTwo = - buildStreamWithDirtyStateDir(appId, stateDirPathTwo, instanceLatch); - streamInstanceOne.start(); + streamInstanceOne.start(); - streamInstanceTwo.start(); + streamInstanceTwo.start(); - // Wait for the record to be processed - assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); + // Wait for the record to be processed + assertTrue(instanceLatch.await(15, TimeUnit.SECONDS)); - waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING), - "Stream instance one should be up and running by now"); - waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING), - "Stream instance one should be up and running by now"); - - streamInstanceOne.close(Duration.ofSeconds(30)); - streamInstanceTwo.close(Duration.ofSeconds(30)); + waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING), + "Stream instance one should be up and running by now"); + waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING), + "Stream instance one should be up and running by now"); + streamInstanceOne.close(Duration.ZERO); + streamInstanceTwo.close(Duration.ZERO); + } } private KafkaStreams buildStreamWithDirtyStateDir(final String appId, @@ -123,14 +121,14 @@ private KafkaStreams buildStreamWithDirtyStateDir(final String appId, .write(Collections.singletonMap(new TopicPartition("unknown-topic", 0), 5L)); assertTrue(new File(stateDirectory.directoryForTask(taskId), - "rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001").mkdirs()); + "rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001").mkdirs()); builder.stream(inputTopic, - Consumed.with(Serdes.Integer(), Serdes.Integer())) - .groupByKey() - .count() - .toStream() - .peek((key, value) -> recordProcessLatch.countDown()); + Consumed.with(Serdes.Integer(), Serdes.Integer())) + .groupByKey() + .count() + .toStream() + .peek((key, value) -> recordProcessLatch.countDown()); return new KafkaStreams(builder.build(), props); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index f08ae1ae28e95..47e8061f4d41f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -233,11 +233,12 @@ public void shouldReturnUnknownOffsetSumForLagOfTaskWithUnknownOffset() { } @Test - public void shouldThrowIllegalStateExceptionIfOffsetSumIsGreaterThanEndOffsetSum() { + public void shouldReturnEndOffsetSumIfOffsetSumIsGreaterThanEndOffsetSum() { final Map taskOffsetSums = Collections.singletonMap(taskId01, 5L); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - assertThrows(IllegalStateException.class, () -> client.computeTaskLags(allTaskEndOffsetSums)); + client.computeTaskLags(allTaskEndOffsetSums); + assertThat(client.lagFor(taskId01), equalTo(1L)); } @Test From 06c17350dac09c8b4a35edeb1b3c8e29132dfa17 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Mon, 23 Mar 2020 17:24:44 -0500 Subject: [PATCH 2/2] CR feedback --- .../internals/StreamsPartitionAssignor.java | 2 +- .../internals/assignment/ClientState.java | 43 ++++++++++--------- .../StandbyTaskEOSIntegrationTest.java | 2 +- .../internals/assignment/ClientStateTest.java | 16 +++---- 4 files changed, 32 insertions(+), 31 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 2d1d3de8c0899..edfbe373bb38e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -825,7 +825,7 @@ private boolean populateClientStatesMap(final Map clientState } if (fetchEndOffsetsSuccessful) { - state.computeTaskLags(allTaskEndOffsetSums); + state.computeTaskLags(uuid, allTaskEndOffsetSums); } clientStates.put(uuid, state); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java index 3f5fbe256dad4..15d00c0bff4ae 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java @@ -16,20 +16,20 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; - import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.UUID; -import org.apache.kafka.streams.processor.internals.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.UNKNOWN_OFFSET_SUM; public class ClientState { private static final Logger LOG = LoggerFactory.getLogger(ClientState.class); @@ -61,7 +61,7 @@ public ClientState() { new HashMap<>(), new HashMap<>(), new HashMap<>(), - capacity); + capacity); } private ClientState(final Set activeTasks, @@ -188,7 +188,7 @@ public void addPreviousTasksAndOffsetSums(final Map taskOffsetSums /** * Compute the lag for each stateful task, including tasks this client did not previously have. */ - public void computeTaskLags(final Map allTaskEndOffsetSums) { + public void computeTaskLags(final UUID uuid, final Map allTaskEndOffsetSums) { if (!taskLagTotals.isEmpty()) { throw new IllegalStateException("Already computed task lags for this client."); } @@ -199,10 +199,11 @@ public void computeTaskLags(final Map allTaskEndOffsetSums) { final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L); if (endOffsetSum < offsetSum) { - LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + - " smaller than offsetSum=" + offsetSum + ". This probably means the task is corrupted," + - " which in turn indicates that it will need to restore from scratch, so we pin the lag" + - " to the end offset of the log."); + LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" + + offsetSum + " on member " + uuid + ". This probably means the task is corrupted," + + " which in turn indicates that it will need to restore from scratch if it gets assigned." + + " The assignor will de-prioritize returning this task to this member in the hopes that" + + " some other member may be able to re-use its state."); taskLagTotals.put(task, endOffsetSum); } else if (offsetSum == Task.LATEST_OFFSET) { taskLagTotals.put(task, Task.LATEST_OFFSET); @@ -218,7 +219,7 @@ public void computeTaskLags(final Map allTaskEndOffsetSums) { * Returns the total lag across all logged stores in the task. Equal to the end offset sum if this client * did not have any state for this task on disk. * - * @return end offset sum - offset sum + * @return end offset sum - offset sum * Task.LATEST_OFFSET if this was previously an active running task on this client */ public long lagFor(final TaskId task) { @@ -276,15 +277,15 @@ boolean hasAssignedTask(final TaskId taskId) { @Override public String toString() { return "[activeTasks: (" + activeTasks + - ") standbyTasks: (" + standbyTasks + - ") assignedTasks: (" + assignedTasks + - ") prevActiveTasks: (" + prevActiveTasks + - ") prevStandbyTasks: (" + prevStandbyTasks + - ") prevAssignedTasks: (" + prevAssignedTasks + - ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() + - ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() + - ") capacity: " + capacity + - "]"; + ") standbyTasks: (" + standbyTasks + + ") assignedTasks: (" + assignedTasks + + ") prevActiveTasks: (" + prevActiveTasks + + ") prevStandbyTasks: (" + prevStandbyTasks + + ") prevAssignedTasks: (" + prevAssignedTasks + + ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() + + ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() + + ") capacity: " + capacity + + "]"; } // Visible for testing diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index bdb842aa365b6..b0840188bbd8a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -98,7 +98,7 @@ public void surviveWithOneTaskAsStandby() throws ExecutionException, Interrupted waitForCondition(() -> streamInstanceOne.state().equals(KafkaStreams.State.RUNNING), "Stream instance one should be up and running by now"); waitForCondition(() -> streamInstanceTwo.state().equals(KafkaStreams.State.RUNNING), - "Stream instance one should be up and running by now"); + "Stream instance two should be up and running by now"); streamInstanceOne.close(Duration.ZERO); streamInstanceTwo.close(Duration.ZERO); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java index 47e8061f4d41f..51b28db9e1fc5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java @@ -199,7 +199,7 @@ public void shouldComputeTaskLags() { mkEntry(taskId02, 100L) ); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThat(client.lagFor(taskId01), equalTo(500L)); assertThat(client.lagFor(taskId02), equalTo(0L)); @@ -210,7 +210,7 @@ public void shouldReturnEndOffsetSumForLagOfTaskWeDidNotPreviouslyOwn() { final Map taskOffsetSums = Collections.emptyMap(); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThat(client.lagFor(taskId01), equalTo(500L)); } @@ -219,7 +219,7 @@ public void shouldReturnLatestOffsetForLagOfPreviousActiveRunningTask() { final Map taskOffsetSums = Collections.singletonMap(taskId01, Task.LATEST_OFFSET); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThat(client.lagFor(taskId01), equalTo(Task.LATEST_OFFSET)); } @@ -228,7 +228,7 @@ public void shouldReturnUnknownOffsetSumForLagOfTaskWithUnknownOffset() { final Map taskOffsetSums = Collections.singletonMap(taskId01, UNKNOWN_OFFSET_SUM); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThat(client.lagFor(taskId01), equalTo(UNKNOWN_OFFSET_SUM)); } @@ -237,7 +237,7 @@ public void shouldReturnEndOffsetSumIfOffsetSumIsGreaterThanEndOffsetSum() { final Map taskOffsetSums = Collections.singletonMap(taskId01, 5L); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThat(client.lagFor(taskId01), equalTo(1L)); } @@ -245,8 +245,8 @@ public void shouldReturnEndOffsetSumIfOffsetSumIsGreaterThanEndOffsetSum() { public void shouldThrowIllegalStateExceptionIfTaskLagsMapIsNotEmpty() { final Map taskOffsetSums = Collections.singletonMap(taskId01, 5L); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 1L); - client.computeTaskLags(taskOffsetSums); - assertThrows(IllegalStateException.class, () -> client.computeTaskLags(allTaskEndOffsetSums)); + client.computeTaskLags(null, taskOffsetSums); + assertThrows(IllegalStateException.class, () -> client.computeTaskLags(null, allTaskEndOffsetSums)); } @Test @@ -254,7 +254,7 @@ public void shouldThrowIllegalStateExceptionOnLagForUnknownTask() { final Map taskOffsetSums = Collections.singletonMap(taskId01, 0L); final Map allTaskEndOffsetSums = Collections.singletonMap(taskId01, 500L); client.addPreviousTasksAndOffsetSums(taskOffsetSums); - client.computeTaskLags(allTaskEndOffsetSums); + client.computeTaskLags(null, allTaskEndOffsetSums); assertThrows(IllegalStateException.class, () -> client.lagFor(taskId02)); }