diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index dc34f9a0c3d48..1fe85cd3d9c4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -694,7 +694,7 @@ public class StreamsConfig extends AbstractConfig { CommonClientConfigs.SECURITY_PROTOCOL_DOC) .define(TASK_TIMEOUT_MS_CONFIG, Type.LONG, - Duration.ofSeconds(5L).toMillis(), + Duration.ofMinutes(5L).toMillis(), atLeast(0L), Importance.MEDIUM, TASK_TIMEOUT_MS_DOC) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index e933e09a07ca8..ecf5ec1921413 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -16,23 +16,29 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Set; import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED; import static org.apache.kafka.streams.processor.internals.Task.State.CREATED; public abstract class AbstractTask implements Task { + private final static long NO_DEADLINE = -1L; + private Task.State state = CREATED; + private long deadlineMs = NO_DEADLINE; protected Set inputPartitions; /** @@ -47,17 +53,20 @@ public abstract class AbstractTask implements Task { protected final ProcessorTopology topology; protected final StateDirectory stateDirectory; protected final ProcessorStateManager stateMgr; + private final long taskTimeoutMs; AbstractTask(final TaskId id, final ProcessorTopology topology, final StateDirectory stateDirectory, final ProcessorStateManager stateMgr, - final Set inputPartitions) { + final Set inputPartitions, + final long taskTimeoutMs) { this.id = id; this.stateMgr = stateMgr; this.topology = topology; this.inputPartitions = inputPartitions; this.stateDirectory = stateDirectory; + this.taskTimeoutMs = taskTimeoutMs; } /** @@ -137,4 +146,46 @@ public void update(final Set topicPartitions, final Map deadlineMs) { + final String errorMessage = String.format( + "Task %s did not make progress within %d ms. Adjust `%s` if needed.", + id, + currentWallClockMs - deadlineMs + taskTimeoutMs, + StreamsConfig.TASK_TIMEOUT_MS_CONFIG + ); + + if (timeoutException != null) { + throw new TimeoutException(errorMessage, timeoutException); + } else { + throw new TimeoutException(errorMessage); + } + } + + if (timeoutException != null) { + log.debug( + "Timeout exception. Remaining time to deadline {}; retrying.", + deadlineMs - currentWallClockMs, + timeoutException + ); + } else { + log.debug( + "Task did not make progress. Remaining time to deadline {}; retrying.", + deadlineMs - currentWallClockMs + ); + } + + } + + void clearTaskTimeout(final Logger log) { + if (deadlineMs != NO_DEADLINE) { + log.debug("Clearing task timeout."); + deadlineMs = NO_DEADLINE; + } + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index b5b331b129511..75d3dbb7b3232 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.StreamsConfig; @@ -64,7 +65,7 @@ public class StandbyTask extends AbstractTask implements Task { final StateDirectory stateDirectory, final ThreadCache cache, final InternalProcessorContext processorContext) { - super(id, topology, stateDirectory, stateMgr, partitions); + super(id, topology, stateDirectory, stateMgr, partitions, config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG)); this.processorContext = processorContext; this.streamsMetrics = streamsMetrics; processorContext.transitionToStandby(cache); @@ -286,6 +287,17 @@ public void addRecords(final TopicPartition partition, final Iterable task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log) + ); + } + + @Test + public void shouldCLearTaskTimeout() { + EasyMock.replay(stateManager); + + final Logger log = new LogContext().logger(StreamTaskTest.class); + task = createStandbyTask(); + + task.maybeInitTaskTimeoutOrThrow(0L, null, log); + task.clearTaskTimeout(log); + task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log); + } + private StandbyTask createStandbyTask() { final ThreadCache cache = new ThreadCache( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 30e20b068da8e..22d34944505f1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -39,6 +39,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -68,6 +69,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.slf4j.Logger; import java.io.File; import java.io.IOException; @@ -2073,6 +2075,30 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { "are added in the same order.")); } + @Test + public void shouldInitTaskTimeoutAndEventuallyThrow() { + final Logger log = new LogContext().logger(StreamTaskTest.class); + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + + task.maybeInitTaskTimeoutOrThrow(0L, null, log); + task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).toMillis(), null, log); + + assertThrows( + TimeoutException.class, + () -> task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log) + ); + } + + @Test + public void shouldCLearTaskTimeout() { + final Logger log = new LogContext().logger(StreamTaskTest.class); + task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); + + task.maybeInitTaskTimeoutOrThrow(0L, null, log); + task.clearTaskTimeout(log); + task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5).plus(Duration.ofMillis(1L)).toMillis(), null, log); + } + private List getTaskMetrics() { return metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 239a7d7ca34c1..6018dbbc8928e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -2692,7 +2692,7 @@ private static class StateMachineTask extends AbstractTask implements Task { final Set partitions, final boolean active, final ProcessorStateManager processorStateManager) { - super(id, null, null, processorStateManager, partitions); + super(id, null, null, processorStateManager, partitions, 0L); this.active = active; } @@ -2765,6 +2765,13 @@ public void resume() { } } + @Override + public void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs, + final TimeoutException timeoutException) throws StreamsException {}; + + @Override + public void clearTaskTimeout() {} + @Override public void closeClean() { transitionTo(State.CLOSED);