From b8112341e1162b8e4d03afa74c5355a152c0d5d4 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jun 2020 18:00:06 -0700 Subject: [PATCH 1/9] always invoke postCommit --- .../processor/internals/TaskManager.java | 24 ++++++++++++++----- 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 025e9f4cac2ed..40daca3f05640 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -242,18 +242,22 @@ public void handleAssignment(final Map> activeTasks, for (final Task task : tasksToClose) { try { - task.suspend(); // Should be a no-op for active tasks since they're suspended in handleRevocation - if (task.commitNeeded()) { - if (task.isActive()) { + if (task.isActive()) { + // Active tasks should have already been suspended and committed during handleRevocation. + // We are just responsible for closing them now + if (task.commitNeeded()) { log.error("Active task {} was revoked and should have already been committed", task.id()); throw new IllegalStateException("Revoked active task was not committed during handleRevocation"); - } else { + } + cleanUpTaskProducer(task, taskCloseExceptions); + } else { + task.suspend(); + if (task.commitNeeded()) { task.prepareCommit(); - task.postCommit(); } + task.postCommit(); } completeTaskCloseClean(task); - cleanUpTaskProducer(task, taskCloseExceptions); tasks.remove(task.id()); } catch (final RuntimeException e) { final String uncleanMessage = String.format( @@ -447,6 +451,10 @@ void handleRevocation(final Collection revokedPartitions) { task.suspend(); if (task.commitNeeded()) { tasksToCommit.add(task); + } else { + // We always need to call postCommit before a clean close, so we can just do that right + // away if there's nothing to commit first + task.postCommit(); } } catch (final RuntimeException e) { log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e); @@ -735,6 +743,8 @@ private Collection tryCloseCleanAllActiveTasks(final boolean clean, final Map committableOffsets = task.prepareCommit(); tasksToCommit.add(task); consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + } else { + task.postCommit(); } tasksToCloseClean.add(task); } catch (final TaskMigratedException e) { @@ -803,6 +813,8 @@ private Collection tryCloseCleanAllStandbyTasks(final boolean clean, if (task.commitNeeded()) { task.prepareCommit(); tasksToCommit.add(task); + } else { + task.postCommit(); } tasksToCloseClean.add(task); } catch (final TaskMigratedException e) { From 251214a0703e0788206c2a8d39f3a7fdd318d4af Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jun 2020 18:33:57 -0700 Subject: [PATCH 2/9] always call prepareCommit as well --- .../processor/internals/StandbyTask.java | 8 ++- .../processor/internals/StreamTask.java | 17 +++-- .../processor/internals/TaskManager.java | 64 ++++++++++--------- 3 files changed, 51 insertions(+), 38 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 1aa68bc721f2e..af735d137ee03 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -152,8 +152,12 @@ public void resume() { @Override public Map prepareCommit() { if (state() == State.RUNNING || state() == State.SUSPENDED) { - stateMgr.flush(); - log.debug("Prepared task for committing"); + if (commitNeeded()) { + stateMgr.flush(); + log.debug("Prepared task for committing"); + } else { + log.debug("Skipping prepareCommit since there is nothing new to commit"); + } } else { throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing "); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 4b27436023b4b..8fa1a23e35321 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -338,14 +338,21 @@ public void resume() { */ @Override public Map prepareCommit() { + final Map offsetsToCommit; switch (state()) { case RUNNING: case RESTORING: case SUSPENDED: - stateMgr.flush(); - recordCollector.flush(); - - log.debug("Prepared task for committing"); + if (commitNeeded) { + stateMgr.flush(); + recordCollector.flush(); + + log.debug("Prepared task for committing"); + offsetsToCommit = committableOffsetsAndMetadata(); + } else { + log.debug("Skipping prepareCommit since there is nothing new to commit"); + offsetsToCommit = Collections.emptyMap(); + } break; @@ -357,7 +364,7 @@ public Map prepareCommit() { throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing"); } - return committableOffsetsAndMetadata(); + return offsetsToCommit; } private Map committableOffsetsAndMetadata() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 40daca3f05640..055b578d75c0c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -245,16 +245,10 @@ public void handleAssignment(final Map> activeTasks, if (task.isActive()) { // Active tasks should have already been suspended and committed during handleRevocation. // We are just responsible for closing them now - if (task.commitNeeded()) { - log.error("Active task {} was revoked and should have already been committed", task.id()); - throw new IllegalStateException("Revoked active task was not committed during handleRevocation"); - } cleanUpTaskProducer(task, taskCloseExceptions); } else { task.suspend(); - if (task.commitNeeded()) { - task.prepareCommit(); - } + task.prepareCommit(); task.postCommit(); } completeTaskCloseClean(task); @@ -441,7 +435,7 @@ boolean tryToCompleteRestoration() { void handleRevocation(final Collection revokedPartitions) { final Set remainingRevokedPartitions = new HashSet<>(revokedPartitions); - final Set tasksToCommit = new HashSet<>(); + final Set revokedTasks = new HashSet<>(); final Set additionalTasksForCommitting = new HashSet<>(); final AtomicReference firstException = new AtomicReference<>(null); @@ -449,13 +443,7 @@ void handleRevocation(final Collection revokedPartitions) { if (remainingRevokedPartitions.containsAll(task.inputPartitions())) { try { task.suspend(); - if (task.commitNeeded()) { - tasksToCommit.add(task); - } else { - // We always need to call postCommit before a clean close, so we can just do that right - // away if there's nothing to commit first - task.postCommit(); - } + revokedTasks.add(task); } catch (final RuntimeException e) { log.error("Caught the following exception while trying to suspend revoked task " + task.id(), e); firstException.compareAndSet(null, new StreamsException("Failed to suspend " + task.id(), e)); @@ -477,21 +465,27 @@ void handleRevocation(final Collection revokedPartitions) { throw suspendException; } + final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); + prepareCommitAndAddOffsetsToMap(revokedTasks, consumedOffsetsAndMetadataPerTask); + // If using eos-beta, if we must commit any task then we must commit all of them // TODO: when KAFKA-9450 is done this will be less expensive, and we can simplify by always committing everything - if (processingMode == EXACTLY_ONCE_BETA && !tasksToCommit.isEmpty()) { - tasksToCommit.addAll(additionalTasksForCommitting); - } - - final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); - for (final Task task : tasksToCommit) { - final Map committableOffsets = task.prepareCommit(); - consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + if (processingMode == EXACTLY_ONCE_BETA && !consumedOffsetsAndMetadataPerTask.isEmpty()) { + prepareCommitAndAddOffsetsToMap(additionalTasksForCommitting, consumedOffsetsAndMetadataPerTask); } commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - for (final Task task : tasksToCommit) { + for (final Task task : revokedTasks) { + try { + task.postCommit(); + } catch (final RuntimeException e) { + log.error("Exception caught while post-committing task " + task.id(), e); + firstException.compareAndSet(null, e); + } + } + + for (final Task task : additionalTasksForCommitting) { try { task.postCommit(); } catch (final RuntimeException e) { @@ -506,6 +500,16 @@ void handleRevocation(final Collection revokedPartitions) { } } + private void prepareCommitAndAddOffsetsToMap(final Set tasksToPrepare, + final Map> consumedOffsetsAndMetadataPerTask) { + for (final Task task : tasksToPrepare) { + final Map committableOffsets = task.prepareCommit(); + if (!committableOffsets.isEmpty()) { + consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); + } + } + } + /** * Closes active tasks as zombies, as these partitions have been lost and are no longer owned. * NOTE this method assumes that when it is called, EVERY task/partition has been lost and must @@ -731,20 +735,18 @@ private Collection tryCloseCleanAllActiveTasks(final boolean clean, if (!clean) { return activeTaskIterable(); } + final Set tasksToCommit = new HashSet<>(); final Set tasksToCloseDirty = new HashSet<>(); final Set tasksToCloseClean = new HashSet<>(); - final Set tasksToCommit = new HashSet<>(); final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); for (final Task task : activeTaskIterable()) { try { task.suspend(); - if (task.commitNeeded()) { - final Map committableOffsets = task.prepareCommit(); - tasksToCommit.add(task); + final Map committableOffsets = task.prepareCommit(); + tasksToCommit.add(task); + if (!committableOffsets.isEmpty()) { consumedOffsetsAndMetadataPerTask.put(task.id(), committableOffsets); - } else { - task.postCommit(); } tasksToCloseClean.add(task); } catch (final TaskMigratedException e) { @@ -764,7 +766,7 @@ private Collection tryCloseCleanAllActiveTasks(final boolean clean, try { commitOffsetsOrTransaction(consumedOffsetsAndMetadataPerTask); - for (final Task task : tasksToCommit) { + for (final Task task : activeTaskIterable()) { try { task.postCommit(); } catch (final RuntimeException e) { From 3032052f911b0e3ded98750a206ea77345768c60 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jun 2020 18:44:40 -0700 Subject: [PATCH 3/9] fix standby shutdown --- .../processor/internals/TaskManager.java | 33 ++----------------- 1 file changed, 3 insertions(+), 30 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 055b578d75c0c..06d2b53f1202c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -806,19 +806,13 @@ private Collection tryCloseCleanAllStandbyTasks(final boolean clean, return standbyTaskIterable(); } final Set tasksToCloseDirty = new HashSet<>(); - final Set tasksToCloseClean = new HashSet<>(); - final Set tasksToCommit = new HashSet<>(); for (final Task task : standbyTaskIterable()) { try { task.suspend(); - if (task.commitNeeded()) { - task.prepareCommit(); - tasksToCommit.add(task); - } else { - task.postCommit(); - } - tasksToCloseClean.add(task); + task.prepareCommit(); + task.postCommit(); + completeTaskCloseClean(task); } catch (final TaskMigratedException e) { // just ignore the exception as it doesn't matter during shutdown tasksToCloseDirty.add(task); @@ -827,27 +821,6 @@ private Collection tryCloseCleanAllStandbyTasks(final boolean clean, tasksToCloseDirty.add(task); } } - - for (final Task task : tasksToCommit) { - try { - task.postCommit(); - } catch (final RuntimeException e) { - log.error("Exception caught while post-committing standby task " + task.id(), e); - firstException.compareAndSet(null, e); - tasksToCloseDirty.add(task); - tasksToCloseClean.remove(task); - } - } - - for (final Task task : tasksToCloseClean) { - try { - completeTaskCloseClean(task); - } catch (final RuntimeException e) { - log.error("Exception caught while clean-closing standby task " + task.id(), e); - firstException.compareAndSet(null, e); - tasksToCloseDirty.add(task); - } - } return tasksToCloseDirty; } From f3c6afd6e636b45c7b6a6b43d93f301cb69f55aa Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 24 Jun 2020 21:09:23 -0700 Subject: [PATCH 4/9] dont throw if prepare/postCommitting CREATED --- .../kafka/streams/processor/internals/StreamTask.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8fa1a23e35321..3a560ae013eb0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -340,6 +340,7 @@ public void resume() { public Map prepareCommit() { final Map offsetsToCommit; switch (state()) { + case CREATED: case RUNNING: case RESTORING: case SUSPENDED: @@ -355,8 +356,7 @@ public Map prepareCommit() { } break; - - case CREATED: + case CLOSED: throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing"); @@ -450,6 +450,9 @@ public void postCommit() { break; case CREATED: + + break; + case CLOSED: throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id); From 13b0ee1df558dea7ec18f9d181ec36ddd456aa95 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 25 Jun 2020 10:48:41 -0700 Subject: [PATCH 5/9] add checkpointNeeded flag --- .../processor/internals/StandbyTask.java | 90 ++++++++++++++----- .../processor/internals/StreamTask.java | 49 +++++++--- 2 files changed, 104 insertions(+), 35 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index af735d137ee03..bd047f35ef01b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -46,14 +46,15 @@ public class StandbyTask extends AbstractTask implements Task { private final InternalProcessorContext processorContext; private final StreamsMetricsImpl streamsMetrics; - private Map offsetSnapshotSinceLastCommit; + private boolean checkpointNeededForSuspended = false; + private Map offsetSnapshotSinceLastCommit = new HashMap<>(); /** * @param id the ID of this task * @param partitions input topic partitions, used for thread metadata only * @param topology the instance of {@link ProcessorTopology} * @param config the {@link StreamsConfig} specified by the user - * @param streamsMetrics the {@link StreamsMetrics} created by the thread + * @param streamsMetrics the {@link StreamsMetrics} created by the thread * @param stateMgr the {@link ProcessorStateManager} for this task * @param stateDirectory the {@link StateDirectory} created by the thread */ @@ -115,8 +116,15 @@ public void completeRestoration() { public void suspend() { switch (state()) { case CREATED: + log.info("Suspended created"); + checkpointNeededForSuspended = false; + transitionTo(State.SUSPENDED); + + break; + case RUNNING: - log.info("Suspended {}", state()); + log.info("Suspended running"); + checkpointNeededForSuspended = true; transitionTo(State.SUSPENDED); break; @@ -144,22 +152,29 @@ public void resume() { } /** - * 1. flush store - * 2. write checkpoint file + * Flush stores before a commit * * @throws StreamsException fatal error, should close the thread */ @Override public Map prepareCommit() { - if (state() == State.RUNNING || state() == State.SUSPENDED) { - if (commitNeeded()) { - stateMgr.flush(); - log.debug("Prepared task for committing"); - } else { - log.debug("Skipping prepareCommit since there is nothing new to commit"); - } - } else { - throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing "); + switch (state()) { + case CREATED: + log.debug("Skipped preparing created task for commit"); + + case RUNNING: + case SUSPENDED: + if (commitNeeded()) { + stateMgr.flush(); + log.debug("Prepared {} task for committing", state()); + } else { + log.debug("Skipped preparing {} task for commit since there is nothing new to commit", state()); + } + + break; + + default: + throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing "); } return Collections.emptyMap(); @@ -167,14 +182,36 @@ public Map prepareCommit() { @Override public void postCommit() { - if (state() == State.RUNNING || state() == State.SUSPENDED) { - // since there's no written offsets we can checkpoint with empty map, - // and the state current offset would be used to checkpoint - stateMgr.checkpoint(Collections.emptyMap()); - offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets()); - log.debug("Finalized commit"); - } else { - throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id); + switch (state()) { + case CREATED: + // We should never write a checkpoint for a CREATED task as we may overwrite an existing checkpoint + // with empty uninitialized offsets + log.debug("Skipped writing checkpoint for created task"); + + break; + + case RUNNING: + if (commitNeeded()) { + writeCheckpoint(); + } + log.debug("Finalized commit for running task"); + + break; + + case SUSPENDED: + // don't overwrite the existing checkpoint file if we haven't actually initialized the offsets yet + if (checkpointNeededForSuspended) { + writeCheckpoint(); + log.debug("Finalized commit for suspended task"); + checkpointNeededForSuspended = false; + } else { + log.debug("Skipped writing checkpoint for uninitialized suspended task"); + } + + break; + + default: + throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id); } } @@ -207,6 +244,13 @@ public void closeCleanAndRecycleState() { log.info("Closed clean and recycled state"); } + private void writeCheckpoint() { + // since there's no written offsets we can checkpoint with empty map, + // and the state's current offset would be used to checkpoint + stateMgr.checkpoint(Collections.emptyMap()); + offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets()); + } + private void close(final boolean clean) { switch (state()) { case SUSPENDED: @@ -247,7 +291,7 @@ private void close(final boolean clean) { @Override public boolean commitNeeded() { // we can commit if the store's offset has changed since last commit - return offsetSnapshotSinceLastCommit == null || !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets()); + return !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets()); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 3a560ae013eb0..22c4bb417005a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -106,6 +106,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private long idleStartTimeMs; private boolean commitNeeded = false; private boolean commitRequested = false; + private boolean checkpointNeededForSuspended = false; public StreamTask(final TaskId id, final Set partitions, @@ -249,8 +250,15 @@ public void completeRestoration() { public void suspend() { switch (state()) { case CREATED: + log.info("Suspended created"); + checkpointNeededForSuspended = false; + transitionTo(State.SUSPENDED); + + break; + case RESTORING: - log.info("Suspended {}", state()); + log.info("Suspended restoring"); + checkpointNeededForSuspended = true; transitionTo(State.SUSPENDED); break; @@ -259,6 +267,7 @@ public void suspend() { try { // use try-catch to ensure state transition to SUSPENDED even if user code throws in `Processor#close()` closeTopology(); + checkpointNeededForSuspended = true; } finally { transitionTo(State.SUSPENDED); log.info("Suspended running"); @@ -341,6 +350,11 @@ public Map prepareCommit() { final Map offsetsToCommit; switch (state()) { case CREATED: + log.debug("Skipped preparing created task for commit"); + offsetsToCommit = Collections.emptyMap(); + + break; + case RUNNING: case RESTORING: case SUSPENDED: @@ -348,15 +362,15 @@ public Map prepareCommit() { stateMgr.flush(); recordCollector.flush(); - log.debug("Prepared task for committing"); + log.debug("Prepared {} task for committing", state()); offsetsToCommit = committableOffsetsAndMetadata(); } else { - log.debug("Skipping prepareCommit since there is nothing new to commit"); + log.debug("Skipped preparing {} task for commit since there is nothing new to commit", state()); offsetsToCommit = Collections.emptyMap(); } break; - + case CLOSED: throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing"); @@ -406,7 +420,7 @@ private Map committableOffsetsAndMetadata() { break; case CLOSED: - throw new IllegalStateException("Illegal state " + state() + " while getting commitable offsets for active task " + id); + throw new IllegalStateException("Illegal state " + state() + " while getting committable offsets for active task " + id); default: throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id); @@ -424,8 +438,16 @@ public void postCommit() { commitNeeded = false; switch (state()) { + case CREATED: + // We should never write a checkpoint for a CREATED task as we may overwrite an existing checkpoint + // with empty uninitialized offsets + log.debug("Skipped writing checkpoint for created task"); + + break; + case RESTORING: writeCheckpoint(); + log.debug("Finalized commit for restoring task"); break; @@ -433,6 +455,7 @@ public void postCommit() { if (!eosEnabled) { writeCheckpoint(); } + log.debug("Finalized commit for running task"); break; @@ -445,11 +468,15 @@ public void postCommit() { */ partitionGroup.clear(); - writeCheckpoint(); - - break; - - case CREATED: + if (checkpointNeededForSuspended) { + // Make sure we don't overwrite the checkpoint if the suspended task was still in CREATED + // and had not yet initialized offsets + writeCheckpoint(); + log.debug("Finalized commit for suspended task"); + checkpointNeededForSuspended = false; + } else { + log.debug("Skipped writing checkpoint for uninitialized suspended task"); + } break; @@ -459,8 +486,6 @@ public void postCommit() { default: throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id); } - - log.debug("Finalized commit"); } private Map extractPartitionTimes() { From 7d3a7718e700be4f43f9a3220ff783bc1bc1bb2c Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 25 Jun 2020 18:49:04 -0700 Subject: [PATCH 6/9] cleaning up StreamTask tests --- .../processor/internals/StandbyTask.java | 8 +- .../processor/internals/StreamTask.java | 23 ++-- .../processor/internals/TaskManager.java | 27 ++-- .../processor/internals/StreamTaskTest.java | 128 ++++++++++++------ 4 files changed, 113 insertions(+), 73 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index bd047f35ef01b..45743519a4386 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -164,12 +164,8 @@ public Map prepareCommit() { case RUNNING: case SUSPENDED: - if (commitNeeded()) { - stateMgr.flush(); - log.debug("Prepared {} task for committing", state()); - } else { - log.debug("Skipped preparing {} task for commit since there is nothing new to commit", state()); - } + stateMgr.flush(); + log.debug("Prepared {} task for committing", state()); break; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 22c4bb417005a..abdd56725c452 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -58,6 +58,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; import static java.util.Collections.singleton; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; @@ -350,22 +351,20 @@ public Map prepareCommit() { final Map offsetsToCommit; switch (state()) { case CREATED: - log.debug("Skipped preparing created task for commit"); - offsetsToCommit = Collections.emptyMap(); - - break; - - case RUNNING: case RESTORING: + case RUNNING: case SUSPENDED: + stateMgr.flush(); + // the commitNeeded flag just indicates whether we have reached RUNNING and processed any new data, + // so it only indicates whether the record collector should be flushed or not, whereas the state + // manager should always be flushed; either there is newly restored data or the flush will be a no-op if (commitNeeded) { - stateMgr.flush(); recordCollector.flush(); log.debug("Prepared {} task for committing", state()); offsetsToCommit = committableOffsetsAndMetadata(); } else { - log.debug("Skipped preparing {} task for commit since there is nothing new to commit", state()); + log.debug("Skipped preparing {} task for commit since there is nothing to commit", state()); offsetsToCommit = Collections.emptyMap(); } @@ -435,7 +434,6 @@ private Map committableOffsetsAndMetadata() { @Override public void postCommit() { commitRequested = false; - commitNeeded = false; switch (state()) { case CREATED: @@ -546,10 +544,11 @@ public void closeCleanAndRecycleState() { private void writeCheckpoint() { if (commitNeeded) { - log.error("Tried to write a checkpoint with pending uncommitted data, should complete the commit first."); - throw new IllegalStateException("A checkpoint should only be written if no commit is needed."); + stateMgr.checkpoint(checkpointableOffsets()); + } else { + stateMgr.checkpoint(emptyMap()); } - stateMgr.checkpoint(checkpointableOffsets()); + commitNeeded = false; } private void validateClean() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 06d2b53f1202c..674ec72951732 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -437,6 +437,7 @@ void handleRevocation(final Collection revokedPartitions) { final Set revokedTasks = new HashSet<>(); final Set additionalTasksForCommitting = new HashSet<>(); + final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); final AtomicReference firstException = new AtomicReference<>(null); for (final Task task : activeTaskIterable()) { @@ -465,12 +466,14 @@ void handleRevocation(final Collection revokedPartitions) { throw suspendException; } - final Map> consumedOffsetsAndMetadataPerTask = new HashMap<>(); prepareCommitAndAddOffsetsToMap(revokedTasks, consumedOffsetsAndMetadataPerTask); // If using eos-beta, if we must commit any task then we must commit all of them // TODO: when KAFKA-9450 is done this will be less expensive, and we can simplify by always committing everything - if (processingMode == EXACTLY_ONCE_BETA && !consumedOffsetsAndMetadataPerTask.isEmpty()) { + final boolean shouldCommitAdditionalTasks = + processingMode == EXACTLY_ONCE_BETA && !consumedOffsetsAndMetadataPerTask.isEmpty(); + + if (shouldCommitAdditionalTasks) { prepareCommitAndAddOffsetsToMap(additionalTasksForCommitting, consumedOffsetsAndMetadataPerTask); } @@ -485,18 +488,20 @@ void handleRevocation(final Collection revokedPartitions) { } } - for (final Task task : additionalTasksForCommitting) { - try { - task.postCommit(); - } catch (final RuntimeException e) { - log.error("Exception caught while post-committing task " + task.id(), e); - firstException.compareAndSet(null, e); + if (shouldCommitAdditionalTasks) { + for (final Task task : additionalTasksForCommitting) { + try { + task.postCommit(); + } catch (final RuntimeException e) { + log.error("Exception caught while post-committing task " + task.id(), e); + firstException.compareAndSet(null, e); + } } } - final RuntimeException commitException = firstException.get(); - if (commitException != null) { - throw commitException; + final RuntimeException postCommitException = firstException.get(); + if (postCommitException != null) { + throw postCommitException; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 960747047cfc4..facbf2e11d1ca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -82,7 +82,10 @@ import java.util.stream.Collectors; import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singleton; import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; @@ -1202,32 +1205,6 @@ public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() { assertThrows(StreamsException.class, () -> task.process(0L)); } - @Test - public void shouldCommitWhenSuspending() throws IOException { - stateDirectory = EasyMock.createNiceMock(StateDirectory.class); - EasyMock.expect(stateDirectory.lock(taskId)).andReturn(true); - EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, 10L)).anyTimes(); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, 10L))); - EasyMock.expectLastCall(); - EasyMock.replay(recordCollector, stateDirectory, stateManager); - - task = createStatefulTask(createConfig(false, "100"), true); - - task.initializeIfNeeded(); - task.completeRestoration(); - - task.suspend(); - task.prepareCommit(); - task.postCommit(); - - assertEquals(Task.State.SUSPENDED, task.state()); - assertTrue(source1.closed); - assertTrue(source2.closed); - - EasyMock.verify(stateManager); - } - @Test public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() throws IOException { stateDirectory = EasyMock.createNiceMock(StateDirectory.class); @@ -1469,7 +1446,6 @@ public Map committed(final Set checkpointableOffsets = singletonMap(partition1, 0L); + stateManager.checkpoint(EasyMock.eq(checkpointableOffsets)); + EasyMock.expect(recordCollector.offsets()).andReturn(checkpointableOffsets).anyTimes(); + EasyMock.replay(stateManager, recordCollector); + + task = createStatefulTask(createConfig(false, "0"), true); + task.initializeIfNeeded(); + task.completeRestoration(); + task.addRecords(partition1, singleton(getConsumerRecord(partition1, 10))); + task.process(100L); + assertTrue(task.commitNeeded()); + + task.suspend(); + task.postCommit(); + EasyMock.verify(stateManager, recordCollector); + } + @Test public void shouldReturnStateManagerChangelogOffsets() { EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition1, 50L)).anyTimes(); @@ -1560,7 +1590,7 @@ public void shouldNotCommitAndThrowOnCloseDirty() { } @Test - public void shouldNotCommitOnCloseRestoring() { + public void shouldCheckpointOnCloseRestoring() { stateManager.flush(); EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); @@ -1584,30 +1614,33 @@ public void shouldNotCommitOnCloseRestoring() { } @Test - public void shouldCommitOnCloseClean() { + public void shouldCheckpointOffsetsOnPostCommitIfCommitNeeded() { final long offset = 543L; + final long consumedOffset = 345L; EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes(); - stateManager.close(); - EasyMock.expectLastCall(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, offset))); EasyMock.expectLastCall(); + stateManager.checkpoint(EasyMock.eq(mkMap( + mkEntry(changelogPartition, offset), + mkEntry(partition1, consumedOffset) + ))); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(recordCollector, stateManager); final MetricName metricName = setupCloseTaskMetric(); - task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); + task = createOptimizedStatefulTask(createConfig(false, "0"), consumer); task.initializeIfNeeded(); task.completeRestoration(); + + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, consumedOffset))); + task.process(100L); + assertTrue(task.commitNeeded()); + task.suspend(); task.prepareCommit(); task.postCommit(); - task.closeClean(); - - assertEquals(Task.State.CLOSED, task.state()); - final double expectedCloseTaskMetric = 1.0; - verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); + assertEquals(Task.State.SUSPENDED, task.state()); EasyMock.verify(stateManager); } @@ -1616,8 +1649,8 @@ public void shouldCommitOnCloseClean() { public void shouldSwallowExceptionOnCloseCleanError() { final long offset = 543L; - EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)).anyTimes(); - stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition, offset))); + EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); + stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(partition1, offset))); EasyMock.expectLastCall(); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition)).anyTimes(); stateManager.close(); @@ -1629,6 +1662,10 @@ public void shouldSwallowExceptionOnCloseCleanError() { task.initializeIfNeeded(); task.completeRestoration(); + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, offset))); + task.process(100L); + assertTrue(task.commitNeeded()); + task.suspend(); task.prepareCommit(); task.postCommit(); @@ -1679,9 +1716,8 @@ public void shouldThrowOnCloseCleanFlushError() { @Test public void shouldThrowOnCloseCleanCheckpointError() { final long offset = 543L; - - EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition, offset)); - stateManager.checkpoint(Collections.singletonMap(changelogPartition, offset)); + EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()); + stateManager.checkpoint(Collections.singletonMap(partition1, offset)); EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); stateManager.close(); EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes(); @@ -1692,6 +1728,10 @@ public void shouldThrowOnCloseCleanCheckpointError() { task = createOptimizedStatefulTask(createConfig(false, "100"), consumer); task.initializeIfNeeded(); + task.addRecords(partition1, singletonList(getConsumerRecord(partition1, offset))); + task.process(100L); + assertTrue(task.commitNeeded()); + task.suspend(); task.prepareCommit(); assertThrows(ProcessorStateException.class, () -> task.postCommit()); From a24df9d7b3e9638ba35896d044c8959cf21e9f62 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 25 Jun 2020 19:13:16 -0700 Subject: [PATCH 7/9] fixing up standby task tests --- .../processor/internals/StandbyTask.java | 5 ++++ .../processor/internals/StandbyTaskTest.java | 25 ++++++++++++++----- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 45743519a4386..38267809522b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -94,6 +94,9 @@ public void initializeIfNeeded() { if (state() == State.CREATED) { StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext); + // initialize the snapshot with the current offsets as we don't need to commit then until they change + offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets()); + // no topology needs initialized, we can transit to RUNNING // right after registered the stores transitionTo(State.RESTORING); @@ -162,6 +165,8 @@ public Map prepareCommit() { case CREATED: log.debug("Skipped preparing created task for commit"); + break; + case RUNNING: case SUSPENDED: stateMgr.flush(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index f98d6304e5842..ba07d55c51c6d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -171,6 +171,7 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldTransitToRunningAfterInitialization() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.registerStateStores(EasyMock.anyObject(), EasyMock.anyObject()); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); @@ -195,12 +196,15 @@ public void shouldTransitToRunningAfterInitialization() { public void shouldThrowIfCommittingOnIllegalState() { EasyMock.replay(stateManager); task = createStandbyTask(); + task.suspend(); + task.closeClean(); assertThrows(IllegalStateException.class, task::prepareCommit); } @Test public void shouldFlushAndCheckpointStateManagerOnCommit() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.flush(); EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); @@ -230,6 +234,7 @@ public void shouldReturnStateManagerChangelogOffsets() { @Test public void shouldNotCommitAndThrowOnCloseDirty() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.close(); EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); stateManager.flush(); @@ -255,6 +260,7 @@ public void shouldNotCommitAndThrowOnCloseDirty() { @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.close(); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); EasyMock.replay(stateManager); @@ -270,6 +276,7 @@ public void shouldNotThrowFromStateManagerCloseInCloseDirty() { @Test public void shouldSuspendAndCommitBeforeCloseClean() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.close(); EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); @@ -311,7 +318,7 @@ public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() { EasyMock.expect(stateManager.changelogOffsets()) .andReturn(Collections.singletonMap(partition, 50L)) .andReturn(Collections.singletonMap(partition, 50L)) - .andReturn(Collections.singletonMap(partition, 60L)); + .andReturn(Collections.singletonMap(partition, 60L)).anyTimes(); stateManager.flush(); EasyMock.expectLastCall(); stateManager.checkpoint(EasyMock.eq(Collections.emptyMap())); @@ -321,22 +328,21 @@ public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() { task = createStandbyTask(); task.initializeIfNeeded(); - assertTrue(task.commitNeeded()); - - task.prepareCommit(); - task.postCommit(); - // do not need to commit if there's no update assertFalse(task.commitNeeded()); // could commit if the offset advanced assertTrue(task.commitNeeded()); + task.prepareCommit(); + task.postCommit(); + EasyMock.verify(stateManager); } @Test public void shouldThrowOnCloseCleanError() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.close(); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); @@ -360,6 +366,9 @@ public void shouldThrowOnCloseCleanError() { @Test public void shouldThrowOnCloseCleanCheckpointError() { + EasyMock.expect(stateManager.changelogOffsets()) + .andReturn(Collections.emptyMap()) + .andReturn(Collections.singletonMap(partition, 0L)); stateManager.checkpoint(EasyMock.anyObject()); EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); @@ -385,6 +394,7 @@ public void shouldThrowOnCloseCleanCheckpointError() { @Test public void shouldUnregisterMetricsInCloseClean() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); @@ -400,6 +410,7 @@ public void shouldUnregisterMetricsInCloseClean() { @Test public void shouldUnregisterMetricsInCloseDirty() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); EasyMock.replay(stateManager); @@ -498,6 +509,7 @@ public void shouldDeleteStateDirOnTaskCreatedAndEosBetaUncleanClose() { @Test public void shouldRecycleTask() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); stateManager.recycle(); EasyMock.replay(stateManager); @@ -528,6 +540,7 @@ public void shouldAlwaysSuspendCreatedTasks() { @Test public void shouldAlwaysSuspendRunningTasks() { + EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); EasyMock.replay(stateManager); task = createStandbyTask(); task.initializeIfNeeded(); From f6715667258e077163d9368c49c07220076b8ccc Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Thu, 25 Jun 2020 19:29:52 -0700 Subject: [PATCH 8/9] cleaning up TM tests --- .../processor/internals/TaskManager.java | 9 ++++---- .../processor/internals/TaskManagerTest.java | 23 ++++++++----------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 674ec72951732..173efff808ff2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -242,16 +242,15 @@ public void handleAssignment(final Map> activeTasks, for (final Task task : tasksToClose) { try { - if (task.isActive()) { - // Active tasks should have already been suspended and committed during handleRevocation. - // We are just responsible for closing them now - cleanUpTaskProducer(task, taskCloseExceptions); - } else { + if (!task.isActive()) { + // Active tasks should have already been suspended and committed during handleRevocation, but + // standbys must be suspended/committed/closed all here task.suspend(); task.prepareCommit(); task.postCommit(); } completeTaskCloseClean(task); + cleanUpTaskProducer(task, taskCloseExceptions); tasks.remove(task.id()); } catch (final RuntimeException e) { final String uncleanMessage = String.format( diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 23166d14d084b..36730b5e168e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -79,6 +79,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.common.utils.Utils.union; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.eq; @@ -414,11 +415,10 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { } @Test - public void shouldCloseDirtyActiveUnassignedTasksWhenErrorSuspendingTask() { + public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true) { @Override - public void suspend() { - super.suspend(); + public void closeClean() { throw new RuntimeException("KABOOM!"); } }; @@ -436,6 +436,7 @@ public void suspend() { replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer, changeLogReader); taskManager.handleAssignment(taskId00Assignment, emptyMap()); + taskManager.handleRevocation(taskId00Partitions); final RuntimeException thrown = assertThrows( RuntimeException.class, @@ -536,6 +537,8 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { assertThat(taskManager.tryToCompleteRestoration(), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); + taskManager.handleRevocation(taskId00Partitions); + final RuntimeException thrown = assertThrows( RuntimeException.class, () -> taskManager.handleAssignment(emptyMap(), emptyMap()) @@ -1399,7 +1402,7 @@ public Map prepareCommit() { } @Test - public void shouldCloseActiveTasksDirtyAndPropagateSuspendException() { + public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() { setUpTaskManager(StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA); final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); @@ -1418,21 +1421,15 @@ public void suspend() { taskManager.tasks().put(taskId01, task01); taskManager.tasks().put(taskId02, task02); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId02); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId01); - replay(activeTaskCreator); final RuntimeException thrown = assertThrows(RuntimeException.class, - () -> taskManager.handleAssignment(mkMap(mkEntry(taskId00, taskId00Partitions)), Collections.emptyMap())); + () -> taskManager.handleRevocation(union(HashSet::new, taskId01Partitions, taskId02Partitions))); assertThat(thrown.getCause().getMessage(), is("task 0_1 suspend boom!")); assertThat(task00.state(), is(Task.State.CREATED)); - assertThat(task01.state(), is(Task.State.CLOSED)); - assertThat(task02.state(), is(Task.State.CLOSED)); - - // All the tasks involving in the commit should already be removed. - assertThat(taskManager.tasks(), is(Collections.singletonMap(taskId00, task00))); + assertThat(task01.state(), is(Task.State.SUSPENDED)); + assertThat(task02.state(), is(Task.State.SUSPENDED)); verify(activeTaskCreator); } From 4125b363105af8402294e40e2872a91fd5e3ab31 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Fri, 26 Jun 2020 09:58:30 -0700 Subject: [PATCH 9/9] fix TTD --- .../main/java/org/apache/kafka/streams/TopologyTestDriver.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 7615612fe16c7..31659bbeb5514 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -1175,6 +1175,8 @@ public SessionStore getSessionStore(final String name) { public void close() { if (task != null) { task.suspend(); + task.prepareCommit(); + task.postCommit(); task.closeClean(); } if (globalStateTask != null) {