diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 1aa68bc721f2e..38267809522b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -46,14 +46,15 @@ public class StandbyTask extends AbstractTask implements Task { private final InternalProcessorContext processorContext; private final StreamsMetricsImpl streamsMetrics; - private Map offsetSnapshotSinceLastCommit; + private boolean checkpointNeededForSuspended = false; + private Map offsetSnapshotSinceLastCommit = new HashMap<>(); /** * @param id the ID of this task * @param partitions input topic partitions, used for thread metadata only * @param topology the instance of {@link ProcessorTopology} * @param config the {@link StreamsConfig} specified by the user - * @param streamsMetrics the {@link StreamsMetrics} created by the thread + * @param streamsMetrics the {@link StreamsMetrics} created by the thread * @param stateMgr the {@link ProcessorStateManager} for this task * @param stateDirectory the {@link StateDirectory} created by the thread */ @@ -93,6 +94,9 @@ public void initializeIfNeeded() { if (state() == State.CREATED) { StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext); + // initialize the snapshot with the current offsets as we don't need to commit then until they change + offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets()); + // no topology needs initialized, we can transit to RUNNING // right after registered the stores transitionTo(State.RESTORING); @@ -115,8 +119,15 @@ public void completeRestoration() { public void suspend() { switch (state()) { case CREATED: + log.info("Suspended created"); + checkpointNeededForSuspended = false; + transitionTo(State.SUSPENDED); + + break; + case RUNNING: - log.info("Suspended {}", state()); + log.info("Suspended running"); + checkpointNeededForSuspended = true; transitionTo(State.SUSPENDED); break; @@ -144,18 +155,27 @@ public void resume() { } /** - * 1. flush store - * 2. write checkpoint file + * Flush stores before a commit * * @throws StreamsException fatal error, should close the thread */ @Override public Map prepareCommit() { - if (state() == State.RUNNING || state() == State.SUSPENDED) { - stateMgr.flush(); - log.debug("Prepared task for committing"); - } else { - throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing "); + switch (state()) { + case CREATED: + log.debug("Skipped preparing created task for commit"); + + break; + + case RUNNING: + case SUSPENDED: + stateMgr.flush(); + log.debug("Prepared {} task for committing", state()); + + break; + + default: + throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing "); } return Collections.emptyMap(); @@ -163,14 +183,36 @@ public Map prepareCommit() { @Override public void postCommit() { - if (state() == State.RUNNING || state() == State.SUSPENDED) { - // since there's no written offsets we can checkpoint with empty map, - // and the state current offset would be used to checkpoint - stateMgr.checkpoint(Collections.emptyMap()); - offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets()); - log.debug("Finalized commit"); - } else { - throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id); + switch (state()) { + case CREATED: + // We should never write a checkpoint for a CREATED task as we may overwrite an existing checkpoint + // with empty uninitialized offsets + log.debug("Skipped writing checkpoint for created task"); + + break; + + case RUNNING: + if (commitNeeded()) { + writeCheckpoint(); + } + log.debug("Finalized commit for running task"); + + break; + + case SUSPENDED: + // don't overwrite the existing checkpoint file if we haven't actually initialized the offsets yet + if (checkpointNeededForSuspended) { + writeCheckpoint(); + log.debug("Finalized commit for suspended task"); + checkpointNeededForSuspended = false; + } else { + log.debug("Skipped writing checkpoint for uninitialized suspended task"); + } + + break; + + default: + throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id); } } @@ -203,6 +245,13 @@ public void closeCleanAndRecycleState() { log.info("Closed clean and recycled state"); } + private void writeCheckpoint() { + // since there's no written offsets we can checkpoint with empty map, + // and the state's current offset would be used to checkpoint + stateMgr.checkpoint(Collections.emptyMap()); + offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets()); + } + private void close(final boolean clean) { switch (state()) { case SUSPENDED: @@ -243,7 +292,7 @@ private void close(final boolean clean) { @Override public boolean commitNeeded() { // we can commit if the store's offset has changed since last commit - return offsetSnapshotSinceLastCommit == null || !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets()); + return !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 4b27436023b4b..abdd56725c452 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -58,6 +58,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; @@ -106,6 +107,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private long idleStartTimeMs; private boolean commitNeeded = false; private boolean commitRequested = false; + private boolean checkpointNeededForSuspended = false; public StreamTask(final TaskId id, final Set partitions, @@ -249,8 +251,15 @@ public void completeRestoration() { public void suspend() { switch (state()) { case CREATED: + log.info("Suspended created"); + checkpointNeededForSuspended = false; + transitionTo(State.SUSPENDED); + + break; + case RESTORING: - log.info("Suspended {}", state()); + log.info("Suspended restoring"); + checkpointNeededForSuspended = true; transitionTo(State.SUSPENDED); break; @@ -259,6 +268,7 @@ public void suspend() { try { // use try-catch to ensure state transition to SUSPENDED even if user code throws in `Processor#close()` closeTopology(); + checkpointNeededForSuspended = true; } finally { transitionTo(State.SUSPENDED); log.info("Suspended running"); @@ -338,18 +348,28 @@ public void resume() { */ @Override public Map prepareCommit() { + final Map offsetsToCommit; switch (state()) { - case RUNNING: + case CREATED: case RESTORING: + case RUNNING: case SUSPENDED: stateMgr.flush(); - recordCollector.flush(); - - log.debug("Prepared task for committing"); + // the commitNeeded flag just indicates whether we have reached RUNNING and processed any new data, + // so it only indicates whether the record collector should be flushed or not, whereas the state + // manager should always be flushed; either there is newly restored data or the flush will be a no-op + if (commitNeeded) { + recordCollector.flush(); + + log.debug("Prepared {} task for committing", state()); + offsetsToCommit = committableOffsetsAndMetadata(); + } else { + log.debug("Skipped preparing {} task for commit since there is nothing to commit", state()); + offsetsToCommit = Collections.emptyMap(); + } break; - case CREATED: case CLOSED: throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing"); @@ -357,7 +377,7 @@ public Map prepareCommit() { throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing"); } - return committableOffsetsAndMetadata(); + return offsetsToCommit; } private Map committableOffsetsAndMetadata() { @@ -399,7 +419,7 @@ private Map committableOffsetsAndMetadata() { break; case CLOSED: - throw new IllegalStateException("Illegal state " + state() + " while getting commitable offsets for active task " + id); + throw new IllegalStateException("Illegal state " + state() + " while getting committable offsets for active task " + id); default: throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id); @@ -414,11 +434,18 @@ private Map committableOffsetsAndMetadata() { @Override public void postCommit() { commitRequested = false; - commitNeeded = false; switch (state()) { + case CREATED: + // We should never write a checkpoint for a CREATED task as we may overwrite an existing checkpoint + // with empty uninitialized offsets + log.debug("Skipped writing checkpoint for created task"); + + break; + case RESTORING: writeCheckpoint(); + log.debug("Finalized commit for restoring task"); break; @@ -426,6 +453,7 @@ public void postCommit() { if (!eosEnabled) { writeCheckpoint(); } + log.debug("Finalized commit for running task"); break; @@ -438,19 +466,24 @@ public void postCommit() { */ partitionGroup.clear(); - writeCheckpoint(); + if (checkpointNeededForSuspended) { + // Make sure we don't overwrite the checkpoint if the suspended task was still in CREATED + // and had not yet initialized offsets + writeCheckpoint(); + log.debug("Finalized commit for suspended task"); + checkpointNeededForSuspended = false; + } else { + log.debug("Skipped writing checkpoint for uninitialized suspended task"); + } break; - case CREATED: case CLOSED: throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id); default: throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id); } - - log.debug("Finalized commit"); } private Map extractPartitionTimes() { @@ -511,10 +544,11 @@ public void closeCleanAndRecycleState() { private void writeCheckpoint() { if (commitNeeded) { - log.error("Tried to write a checkpoint with pending uncommitted data, should complete the commit first."); - throw new IllegalStateException("A checkpoint should only be written if no commit is needed."); + stateMgr.checkpoint(checkpointableOffsets()); + } else { + stateMgr.checkpoint(emptyMap()); } - stateMgr.checkpoint(checkpointableOffsets()); + commitNeeded = false; } private void validateClean() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 025e9f4cac2ed..173efff808ff2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -242,15 +242,12 @@ public void handleAssignment(final Map> activeTasks, for (final Task task : tasksToClose) { try { - task.suspend(); // Should be a no-op for active tasks since they're suspended in handleRevocation - if (task.commitNeeded()) { - if (task.isActive()) { - log.error("Active task {} was revoked and should have already been committed", task.id()); - throw new IllegalStateException("Revoked active task was not committed during handleRevocation"); - } else { - task.prepareCommit(); - task.postCommit(); - } + if (!task.isActive()) { + // Active tasks should have already been suspended and committed during handleRevocation, but + // standbys must be suspended/committed/closed all here + task.suspend(); + task.prepareCommit(); + task.postCommit(); } completeTaskCloseClean(task); cleanUpTaskProducer(task, taskCloseExceptions); @@ -437,17 +434,16 @@ boolean tryToCompleteRestoration() { void handleRevocation(final Collection revokedPartitions) { final Set remainingRevokedPartitions = new HashSet<>(revokedPartitions); - final Set tasksToCommit = new HashSet<>(); + final Set revokedTasks = new HashSet<>(); final Set additionalTasksForCommitting = new HashSet<>(); + final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); final AtomicReference firstException = new AtomicReference<>(null); for (final Task task : activeTaskIterable()) { if (remainingRevokedPartitions.containsAll(task.inputPartitions())) { try { task.suspend(); - if (task.commitNeeded()) { - tasksToCommit.add(task); - } + revokedTasks.add(task); } catch (final RuntimeException e) { log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e); firstException.compareAndSet(null, new StreamsException("Failed to suspend " + task.id(), e)); @@ -469,21 +465,20 @@ void handleRevocation(final Collection revokedPartitions) { throw suspendException; } + prepareCommitAndAddOffsetsToMap(revokedTasks, consumedOffsetsAndMetadataPerTask); + // If using eos-beta, if we must commit any task then we must commit all of them // TODO: when KAFKA-9450 is done this will be less expensive, and we can simplify by always committing everything - if (processingMode == EXACTLY_ONCE_BETA && !tasksToCommit.isEmpty()) { - tasksToCommit.addAll(additionalTasksForCommitting); - } + final boolean shouldCommitAdditionalTasks = + processingMode == EXACTLY_ONCE_BETA && !consumedOffsetsAndMetadataPerTask.isEmpty(); - final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - for (final Task task : tasksToCommit) { - final Map committableOffsets = task.prepareCommit(); - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + if (shouldCommitAdditionalTasks) { + prepareCommitAndAddOffsetsToMap(additionalTasksForCommitting, consumedOffsetsAndMetadataPerTask); } commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - for (final Task task : tasksToCommit) { + for (final Task task : revokedTasks) { try { task.postCommit(); } catch (final RuntimeException e) { @@ -492,9 +487,30 @@ void handleRevocation(final Collection revokedPartitions) { } } - final RuntimeException commitException = firstException.get(); - if (commitException != null) { - throw commitException; + if (shouldCommitAdditionalTasks) { + for (final Task task : additionalTasksForCommitting) { + try { + task.postCommit(); + } catch (final RuntimeException e) { + log.error("Exception caught while post-committing task " + task.id(), e); + firstException.compareAndSet(null, e); + } + } + } + + final RuntimeException postCommitException = firstException.get(); + if (postCommitException != null) { + throw postCommitException; + } + } + + private void prepareCommitAndAddOffsetsToMap(final Set tasksToPrepare, + final Map> consumedOffsetsAndMetadataPerTask) { + for (final Task task : tasksToPrepare) { + final Map committableOffsets = task.prepareCommit(); + if (!committableOffsets.isEmpty()) { + consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + } } } @@ -723,17 +739,17 @@ private Collection tryCloseCleanAllActiveTasks(final boolean clean, if (!clean) { return activeTaskIterable(); } + final Set tasksToCommit = new HashSet<>(); final Set tasksToCloseDirty = new HashSet<>(); final Set tasksToCloseClean = new HashSet<>(); - final Set tasksToCommit = new HashSet<>(); final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); for (final Task task : activeTaskIterable()) { try { task.suspend(); - if (task.commitNeeded()) { - final Map committableOffsets = task.prepareCommit(); - tasksToCommit.add(task); + final Map committableOffsets = task.prepareCommit(); + tasksToCommit.add(task); + if (!committableOffsets.isEmpty()) { consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); } tasksToCloseClean.add(task); @@ -754,7 +770,7 @@ private Collection tryCloseCleanAllActiveTasks(final boolean clean, try { commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - for (final Task task : tasksToCommit) { + for (final Task task : activeTaskIterable()) { try { task.postCommit(); } catch (final RuntimeException e) { @@ -794,17 +810,13 @@ private Collection tryCloseCleanAllStandbyTasks(final boolean clean, return standbyTaskIterable(); } final Set tasksToCloseDirty = new HashSet<>(); - final Set tasksToCloseClean = new HashSet<>(); - final Set tasksToCommit = new HashSet<>(); for (final Task task : standbyTaskIterable()) { try { task.suspend(); - if (task.commitNeeded()) { - task.prepareCommit(); - tasksToCommit.add(task); - } - tasksToCloseClean.add(task); + task.prepareCommit(); + task.postCommit(); + completeTaskCloseClean(task); } catch (final TaskMigratedException e) { // just ignore the exception as it doesn't matter during shutdown tasksToCloseDirty.add(task); @@ -813,27 +825,6 @@ private Collection tryCloseCleanAllStandbyTasks(final boolean clean, tasksToCloseDirty.add(task); } } - - for (final Task task : tasksToCommit) { - try { - task.postCommit(); - } catch (final RuntimeException e) { - log.error("Exception caught while post-committing standby task " + task.id(), e); - firstException.compareAndSet(null, e); - tasksToCloseDirty.add(task); - tasksToCloseClean.remove(task); - } - } - - for (final Task task : tasksToCloseClean) { - try { - completeTaskCloseClean(task); - } catch (final RuntimeException e) { - log.error("Exception caught while clean-closing standby task " + task.id(), e); - firstException.compareAndSet(null, e); - tasksToCloseDirty.add(task); - } - } return tasksToCloseDirty; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index f98d6304e5842..ba07d55c51c6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -171,6 +171,7 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldTransitToRunningAfterInitialization() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.registerStateStores(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); @@ -195,12 +196,15 @@ public void shouldTransitToRunningAfterInitialization() { public void shouldThrowIfCommittingOnIllegalState() { EasyMock.replay(stateManager); task = createStandbyTask(); + task.suspend(); + task.closeClean(); assertThrows(IllegalStateException.class, task::prepareCommit); } @Test public void shouldFlushAndCheckpointStateManagerOnCommit() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.flush(); EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); @@ -230,6 +234,7 @@ public void shouldReturnStateManagerChangelogOffsets() { @Test public void shouldNotCommitAndThrowOnCloseDirty() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.close(); EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); stateManager.flush(); @@ -255,6 +260,7 @@ public void shouldNotCommitAndThrowOnCloseDirty() { @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.close(); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); EasyMock.replay(stateManager); @@ -270,6 +276,7 @@ public void shouldNotThrowFromStateManagerCloseInCloseDirty() { @Test public void shouldSuspendAndCommitBeforeCloseClean() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.close(); EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); @@ -311,7 +318,7 @@ public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() { EasyMock.expect(stateManager.changelogOffsets()) .andReturn(Collections.singletonMap(partition, 50L)) .andReturn(Collections.singletonMap(partition, 50L)) - .andReturn(Collections.singletonMap(partition, 60L)); + .andReturn(Collections.singletonMap(partition, 60L)).anyTimes(); stateManager.flush(); EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); @@ -321,22 +328,21 @@ public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() { task = createStandbyTask(); task.initializeIfNeeded(); - assertTrue(task.commitNeeded()); - - task.prepareCommit(); - task.postCommit(); - // do not need to commit if there's no update assertFalse(task.commitNeeded()); // could commit if the offset advanced assertTrue(task.commitNeeded()); + task.prepareCommit(); + task.postCommit(); + EasyMock.verify(stateManager); } @Test public void shouldThrowOnCloseCleanError() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.close(); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); @@ -360,6 +366,9 @@ public void shouldThrowOnCloseCleanError() { @Test public void shouldThrowOnCloseCleanCheckpointError() { + EasyMock.expect(stateManager.changelogOffsets()) + .andReturn(Collections.emptyMap()) + .andReturn(Collections.singletonMap(partition, 0L)); stateManager.checkpoint(EasyMock.anyObject()); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); @@ -385,6 +394,7 @@ public void shouldThrowOnCloseCleanCheckpointError() { @Test public void shouldUnregisterMetricsInCloseClean() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); @@ -400,6 +410,7 @@ public void shouldUnregisterMetricsInCloseClean() { @Test public void shouldUnregisterMetricsInCloseDirty() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); @@ -498,6 +509,7 @@ public void shouldDeleteStateDirOnTaskCreatedAndEosBetaUncleanClose() { @Test public void shouldRecycleTask() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.recycle(); EasyMock.replay(stateManager); @@ -528,6 +540,7 @@ public void shouldAlwaysSuspendCreatedTasks() { @Test public void shouldAlwaysSuspendRunningTasks() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); EasyMock.replay(stateManager); task = createStandbyTask(); task.initializeIfNeeded(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 960747047cfc4..facbf2e11d1ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -82,7 +82,10 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; @@ -1202,32 +1205,6 @@ public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() { assertThrows(StreamsException.class, () -> task.process(0L)); } - @Test - public void shouldCommitWhenSuspending() throws IOException { - stateDirectory = EasyMock.createNiceMock(StateDirectory.class); - EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true); - EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, 10L)).anyTimes(); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, 10L))); - EasyMock.expectLastCall(); - EasyMock.replay(recordCollector, stateDirectory, stateManager); - - task = createStatefulTask(createConfig(false, "100"), true); - - task.initializeIfNeeded(); - task.completeRestoration(); - - task.suspend(); - task.prepareCommit(); - task.postCommit(); - - assertEquals(Task.State.SUSPENDED, task.state()); - assertTrue(source1.closed); - assertTrue(source2.closed); - - EasyMock.verify(stateManager); - } - @Test public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() throws IOException { stateDirectory = EasyMock.createNiceMock(StateDirectory.class); @@ -1469,7 +1446,6 @@ public Map committed(final Set checkpointableOffsets = singletonMap(partition1, 0L); + stateManager.checkpoint(EasyMock.eq(checkpointableOffsets)); + EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).anyTimes(); + EasyMock.replay(stateManager, recordCollector); + + task = createStatefulTask(createConfig(false, "0"), true); + task.initializeIfNeeded(); + task.completeRestoration(); + task.addRecords(partition1, singleton(getConsumerRecord(partition1, 10))); + task.process(100L); + assertTrue(task.commitNeeded()); + + task.suspend(); + task.postCommit(); + EasyMock.verify(stateManager, recordCollector); + } + @Test public void shouldReturnStateManagerChangelogOffsets() { EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition1, 50L)).anyTimes(); @@ -1560,7 +1590,7 @@ public void shouldNotCommitAndThrowOnCloseDirty() { } @Test - public void shouldNotCommitOnCloseRestoring() { + public void shouldCheckpointOnCloseRestoring() { stateManager.flush(); EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); @@ -1584,30 +1614,33 @@ public void shouldNotCommitOnCloseRestoring() { } @Test - public void shouldCommitOnCloseClean() { + public void shouldCheckpointOffsetsOnPostCommitIfCommitNeeded() { final long offset = 543L; + final long consumedOffset = 345L; EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes(); - stateManager.close(); - EasyMock.expectLastCall(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, offset))); EasyMock.expectLastCall(); + stateManager.checkpoint(EasyMock.eq(mkMap( + mkEntry(changelogPartition, offset), + mkEntry(partition1, consumedOffset) + ))); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(recordCollector, stateManager); final MetricName metricName = setupCloseTaskMetric(); - task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); + task = createOptimizedStatefulTask(createConfig(false, "0"), consumer); task.initializeIfNeeded(); task.completeRestoration(); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, consumedOffset))); + task.process(100L); + assertTrue(task.commitNeeded()); + task.suspend(); task.prepareCommit(); task.postCommit(); - task.closeClean(); - - assertEquals(Task.State.CLOSED, task.state()); - final double expectedCloseTaskMetric = 1.0; - verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); + assertEquals(Task.State.SUSPENDED, task.state()); EasyMock.verify(stateManager); } @@ -1616,8 +1649,8 @@ public void shouldCommitOnCloseClean() { public void shouldSwallowExceptionOnCloseCleanError() { final long offset = 543L; - EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, offset))); + EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); + stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(partition1, offset))); EasyMock.expectLastCall(); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)).anyTimes(); stateManager.close(); @@ -1629,6 +1662,10 @@ public void shouldSwallowExceptionOnCloseCleanError() { task.initializeIfNeeded(); task.completeRestoration(); + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, offset))); + task.process(100L); + assertTrue(task.commitNeeded()); + task.suspend(); task.prepareCommit(); task.postCommit(); @@ -1679,9 +1716,8 @@ public void shouldThrowOnCloseCleanFlushError() { @Test public void shouldThrowOnCloseCleanCheckpointError() { final long offset = 543L; - - EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)); - stateManager.checkpoint(Collections.singletonMap(changelogPartition, offset)); + EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()); + stateManager.checkpoint(Collections.singletonMap(partition1, offset)); EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); stateManager.close(); EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes(); @@ -1692,6 +1728,10 @@ public void shouldThrowOnCloseCleanCheckpointError() { task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); task.initializeIfNeeded(); + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, offset))); + task.process(100L); + assertTrue(task.commitNeeded()); + task.suspend(); task.prepareCommit(); assertThrows(ProcessorStateException.class, () -> task.postCommit()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 23166d14d084b..36730b5e168e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -79,6 +79,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.common.utils.Utils.union; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.eq; @@ -414,11 +415,10 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { } @Test - public void shouldCloseDirtyActiveUnassignedTasksWhenErrorSuspendingTask() { + public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) { @Override - public void suspend() { - super.suspend(); + public void closeClean() { throw new RuntimeException("KABOOM!"); } }; @@ -436,6 +436,7 @@ public void suspend() { replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader); taskManager.handleAssignment(taskId00Assignment, emptyMap()); + taskManager.handleRevocation(taskId00Partitions); final RuntimeException thrown = assertThrows( RuntimeException.class, @@ -536,6 +537,8 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { assertThat(taskManager.tryToCompleteRestoration(), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); + taskManager.handleRevocation(taskId00Partitions); + final RuntimeException thrown = assertThrows( RuntimeException.class, () -> taskManager.handleAssignment(emptyMap(), emptyMap()) @@ -1399,7 +1402,7 @@ public Map prepareCommit() { } @Test - public void shouldCloseActiveTasksDirtyAndPropagateSuspendException() { + public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() { setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA); final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); @@ -1418,21 +1421,15 @@ public void suspend() { taskManager.tasks().put(taskId01, task01); taskManager.tasks().put(taskId02, task02); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId02); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01); - replay(activeTaskCreator); final RuntimeException thrown = assertThrows(RuntimeException.class, - () -> taskManager.handleAssignment(mkMap(mkEntry(taskId00, taskId00Partitions)), Collections.emptyMap())); + () -> taskManager.handleRevocation(union(HashSet::new, taskId01Partitions, taskId02Partitions))); assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend boom!")); assertThat(task00.state(), is(Task.State.CREATED)); - assertThat(task01.state(), is(Task.State.CLOSED)); - assertThat(task02.state(), is(Task.State.CLOSED)); - - // All the tasks involving in the commit should already be removed. - assertThat(taskManager.tasks(), is(Collections.singletonMap(taskId00, task00))); + assertThat(task01.state(), is(Task.State.SUSPENDED)); + assertThat(task02.state(), is(Task.State.SUSPENDED)); verify(activeTaskCreator); } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 7615612fe16c7..31659bbeb5514 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -1175,6 +1175,8 @@ public SessionStore getSessionStore(final String name) { public void close() { if (task != null) { task.suspend(); + task.prepareCommit(); + task.postCommit(); task.closeClean(); } if (globalStateTask != null) {