diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 33878017b7ae1..5656c914f7bd3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -34,9 +34,7 @@ import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; -import java.util.Map; import java.util.Set; public abstract class AbstractTask implements Task { @@ -173,10 +171,6 @@ public String toString(final String indent) { return sb.toString(); } - protected Map activeTaskCheckpointableOffsets() { - return Collections.emptyMap(); - } - protected void updateOffsetLimits() { for (final TopicPartition partition : partitions) { try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d50d5c23cc6c9..d6ee2dff735ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -484,14 +484,11 @@ void commit(final boolean startNewTransaction) { taskMetrics.taskCommitTimeSensor.record(time.nanoseconds() - startNs); } - @Override - protected Map activeTaskCheckpointableOffsets() { - final Map checkpointableOffsets = - new HashMap<>(recordCollector.offsets()); + private Map activeTaskCheckpointableOffsets() { + final Map checkpointableOffsets = new HashMap<>(recordCollector.offsets()); for (final Map.Entry entry : consumedOffsets.entrySet()) { checkpointableOffsets.putIfAbsent(entry.getKey(), entry.getValue()); } - return checkpointableOffsets; }