Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,15 @@ public class StandbyTask extends AbstractTask implements Task {
private final InternalProcessorContext processorContext;
private final StreamsMetricsImpl streamsMetrics;

private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit;
private boolean checkpointNeededForSuspended = false;
private Map<TopicPartition, Long> offsetSnapshotSinceLastCommit = new HashMap<>();

/**
* @param id the ID of this task
* @param partitions input topic partitions, used for thread metadata only
* @param topology the instance of {@link ProcessorTopology}
* @param config the {@link StreamsConfig} specified by the user
* @param streamsMetrics the {@link StreamsMetrics} created by the thread
* @param streamsMetrics the {@link StreamsMetrics} created by the thread
* @param stateMgr the {@link ProcessorStateManager} for this task
* @param stateDirectory the {@link StateDirectory} created by the thread
*/
Expand Down Expand Up @@ -93,6 +94,9 @@ public void initializeIfNeeded() {
if (state() == State.CREATED) {
StateManagerUtil.registerStateStores(log, logPrefix, topology, stateMgr, stateDirectory, processorContext);

// initialize the snapshot with the current offsets as we don't need to commit then until they change
offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());

// no topology needs initialized, we can transit to RUNNING
// right after registered the stores
transitionTo(State.RESTORING);
Expand All @@ -115,8 +119,15 @@ public void completeRestoration() {
public void suspend() {
switch (state()) {
case CREATED:
log.info("Suspended created");
checkpointNeededForSuspended = false;
transitionTo(State.SUSPENDED);

break;

case RUNNING:
log.info("Suspended {}", state());
log.info("Suspended running");
checkpointNeededForSuspended = true;
transitionTo(State.SUSPENDED);

break;
Expand Down Expand Up @@ -144,33 +155,64 @@ public void resume() {
}

/**
* 1. flush store
* 2. write checkpoint file
* Flush stores before a commit
*
* @throws StreamsException fatal error, should close the thread
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
if (state() == State.RUNNING || state() == State.SUSPENDED) {
stateMgr.flush();
log.debug("Prepared task for committing");
} else {
throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
switch (state()) {
case CREATED:
log.debug("Skipped preparing created task for commit");

break;

case RUNNING:
case SUSPENDED:
stateMgr.flush();
log.debug("Prepared {} task for committing", state());

break;

default:
throw new IllegalStateException("Illegal state " + state() + " while preparing standby task " + id + " for committing ");
}

return Collections.emptyMap();
}

@Override
public void postCommit() {
if (state() == State.RUNNING || state() == State.SUSPENDED) {
// since there's no written offsets we can checkpoint with empty map,
// and the state current offset would be used to checkpoint
stateMgr.checkpoint(Collections.emptyMap());
offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
log.debug("Finalized commit");
} else {
throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
switch (state()) {
case CREATED:
// We should never write a checkpoint for a CREATED task as we may overwrite an existing checkpoint
// with empty uninitialized offsets
log.debug("Skipped writing checkpoint for created task");

break;

case RUNNING:
if (commitNeeded()) {
writeCheckpoint();
}
log.debug("Finalized commit for running task");

break;

case SUSPENDED:
// don't overwrite the existing checkpoint file if we haven't actually initialized the offsets yet
if (checkpointNeededForSuspended) {
writeCheckpoint();
log.debug("Finalized commit for suspended task");
checkpointNeededForSuspended = false;
} else {
log.debug("Skipped writing checkpoint for uninitialized suspended task");
}

break;

default:
throw new IllegalStateException("Illegal state " + state() + " while post committing standby task " + id);
}
}

Expand Down Expand Up @@ -203,6 +245,13 @@ public void closeCleanAndRecycleState() {
log.info("Closed clean and recycled state");
}

private void writeCheckpoint() {
// since there's no written offsets we can checkpoint with empty map,
// and the state's current offset would be used to checkpoint
stateMgr.checkpoint(Collections.emptyMap());
offsetSnapshotSinceLastCommit = new HashMap<>(stateMgr.changelogOffsets());
}

private void close(final boolean clean) {
switch (state()) {
case SUSPENDED:
Expand Down Expand Up @@ -243,7 +292,7 @@ private void close(final boolean clean) {
@Override
public boolean commitNeeded() {
// we can commit if the store's offset has changed since last commit
return offsetSnapshotSinceLastCommit == null || !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
return !offsetSnapshotSinceLastCommit.equals(stateMgr.changelogOffsets());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.emptyMap;
import static java.util.Collections.singleton;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;

Expand Down Expand Up @@ -106,6 +107,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator,
private long idleStartTimeMs;
private boolean commitNeeded = false;
private boolean commitRequested = false;
private boolean checkpointNeededForSuspended = false;

public StreamTask(final TaskId id,
final Set<TopicPartition> partitions,
Expand Down Expand Up @@ -249,8 +251,15 @@ public void completeRestoration() {
public void suspend() {
switch (state()) {
case CREATED:
log.info("Suspended created");
checkpointNeededForSuspended = false;
transitionTo(State.SUSPENDED);

break;

case RESTORING:
log.info("Suspended {}", state());
log.info("Suspended restoring");
checkpointNeededForSuspended = true;
transitionTo(State.SUSPENDED);

break;
Expand All @@ -259,6 +268,7 @@ public void suspend() {
try {
// use try-catch to ensure state transition to SUSPENDED even if user code throws in `Processor#close()`
closeTopology();
checkpointNeededForSuspended = true;
} finally {
transitionTo(State.SUSPENDED);
log.info("Suspended running");
Expand Down Expand Up @@ -338,26 +348,36 @@ public void resume() {
*/
@Override
public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit;
switch (state()) {
case RUNNING:
case CREATED:
case RESTORING:
case RUNNING:
case SUSPENDED:
stateMgr.flush();
recordCollector.flush();

log.debug("Prepared task for committing");
// the commitNeeded flag just indicates whether we have reached RUNNING and processed any new data,
// so it only indicates whether the record collector should be flushed or not, whereas the state
// manager should always be flushed; either there is newly restored data or the flush will be a no-op
if (commitNeeded) {
recordCollector.flush();

log.debug("Prepared {} task for committing", state());
offsetsToCommit = committableOffsetsAndMetadata();
} else {
log.debug("Skipped preparing {} task for commit since there is nothing to commit", state());
offsetsToCommit = Collections.emptyMap();
}

break;

case CREATED:
case CLOSED:
throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + id + " for committing");

default:
throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + id + " for committing");
}

return committableOffsetsAndMetadata();
return offsetsToCommit;
}

private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
Expand Down Expand Up @@ -399,7 +419,7 @@ private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
break;

case CLOSED:
throw new IllegalStateException("Illegal state " + state() + " while getting commitable offsets for active task " + id);
throw new IllegalStateException("Illegal state " + state() + " while getting committable offsets for active task " + id);

default:
throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
Expand All @@ -414,18 +434,26 @@ private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
@Override
public void postCommit() {
commitRequested = false;
commitNeeded = false;

switch (state()) {
case CREATED:
// We should never write a checkpoint for a CREATED task as we may overwrite an existing checkpoint
// with empty uninitialized offsets
log.debug("Skipped writing checkpoint for created task");

break;

case RESTORING:
writeCheckpoint();
log.debug("Finalized commit for restoring task");

break;

case RUNNING:
if (!eosEnabled) {
writeCheckpoint();
}
log.debug("Finalized commit for running task");

break;

Expand All @@ -438,19 +466,24 @@ public void postCommit() {
*/
partitionGroup.clear();

writeCheckpoint();
if (checkpointNeededForSuspended) {
// Make sure we don't overwrite the checkpoint if the suspended task was still in CREATED
// and had not yet initialized offsets
writeCheckpoint();
log.debug("Finalized commit for suspended task");
checkpointNeededForSuspended = false;
} else {
log.debug("Skipped writing checkpoint for uninitialized suspended task");
}

break;

case CREATED:
case CLOSED:
throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + id);

default:
throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
}

log.debug("Finalized commit");
}

private Map<TopicPartition, Long> extractPartitionTimes() {
Expand Down Expand Up @@ -511,10 +544,11 @@ public void closeCleanAndRecycleState() {

private void writeCheckpoint() {
if (commitNeeded) {
log.error("Tried to write a checkpoint with pending uncommitted data, should complete the commit first.");
throw new IllegalStateException("A checkpoint should only be written if no commit is needed.");
stateMgr.checkpoint(checkpointableOffsets());
} else {
stateMgr.checkpoint(emptyMap());
}
stateMgr.checkpoint(checkpointableOffsets());
commitNeeded = false;
}

private void validateClean() {
Expand Down
Loading