Skip to content

KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted#8058

Merged
guozhangwang merged 29 commits intoapache:trunkfrom
guozhangwang:KMinor-invalid-offset-changelog-reader
Feb 21, 2020
Merged

KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted#8058
guozhangwang merged 29 commits intoapache:trunkfrom
guozhangwang:KMinor-invalid-offset-changelog-reader

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang commented Feb 7, 2020

  1. Removed task field from TaskMigrated; the only caller that encodes a task id from StreamTask actually do not throw so we only log it. To handle it on StreamThread we just always enforce rebalance (and we would call onPartitionsLost to remove all tasks as dirty).

  2. Added TaskCorruptedException with a set of task-ids. The first scenario of this is the restoreConsumer.poll which throws InvalidOffset indicating that the logs are truncated / compacted. To handle it on StreamThread we first close the corresponding tasks as dirty (if EOS is enabled we would also wipe out the state stores), and then revive them into the CREATED state.

  3. Also fixed a bug while investigating KAFKA-9572: when suspending / closing a restoring task we should not commit the new offsets but only updating the checkpoint file.

  4. Re-enabled the unit test.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang
Copy link
Copy Markdown
Contributor Author

test this please

Copy link
Copy Markdown
Contributor Author

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

stores.clear();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to clear the stores map now since we may re-initialize the state stores upon reviving a task.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to clear storeToChangelogTopic, etc?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storeToChangelogTopic and sourcePartitions are passed in at construction time and final, so we cannot clear them (since they would only be initialized once).

/**
* Indicates a specific task is corrupted and need to be re-initialized. It can be thrown when
*
* 1) Under EOS, if the checkpoint file does not contain offsets for corresponding store's changelogs, meaning
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case 1) would be done in another PR, I just added the java-doc here to complete the scope.

if (!assignment.containsAll(partitions)) {
throw new IllegalStateException("The current assignment " + assignment + " " +
"does not contain some of the partitions " + partitions + " for removing.");
if (assignment.removeAll(partitions)) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I made the remove call idempotent.

/**
* Revive a closed task to a created one; should never throw an exception
*/
void revive();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay this might be a bit controversial: I tried to make the re-initialization logic as cheap as possible since otherwise we may be kicked out of the group for not calling consumer.poll in time. After looking at the source code I found the only part is the closure of the task-level sensors which I moved out of the task.close functions. So from CLOSED -> CREATED there's nothing necessary.

final Task task = tasks.get(taskId);

// this call is idempotent so even if the task is only CREATED we can still call it
changelogReader.remove(task.changelogPartitions());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to call changelog.remove() before task.closeDirty since now we clear the stores map in closeDirty, and after that task.changelogPartitions() would return nothing.

standbyTasksToCreate.remove(task.id());
} else /* we previously owned this task, and we don't have it anymore, or it has changed active/standby state */ {
final Set<TopicPartition> inputPartitions = task.inputPartitions();
cleanupTask(task);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidated a couple of the same pattern into this private function.

changelogReader.remove(task.changelogPartitions());
}

for (final TopicPartition inputPartition : inputPartitions) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why we do not remove the input partitions previously.. is that intentional @vvcephei ? If not I'd move the removal into the block (I already did just to clarify :P)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was to remove the input partitions from the map any time we remove a task from tasks. It looks like your code maintains this (in a clearer and cleaner way).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. The previous code is that we would partitionToTask.remove(inputPartition); and remove the task no matter if the task is closed or not, which is a bit weird --- for standby tasks, we do not close them, but we still remove from the iterator and remove from the materialized partitionToTask. My modification is to ONLY do this logic if we are closing the task.

As long as you agree this is correct I'm relieved.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, then it was my mistake before! Good catch.

}

private void cleanupTask(final Task task) {
// 1. remove the changelog partitions from changelog reader;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order here cannot be changed so I left this comment.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

The unit test suite passed locally (Java8, Scala 2.12). Currently I cannot trigger a jenkins build out of it (not knowing why).

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to look through TaskManager, but left a few initial comments on the rest

if (state == CLOSED) {
transitionTo(CREATED);
} else {
throw new IllegalStateException("Illegal state " + state() + " while committing standby task " + id);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove "standby" from error message (unless this only applies to standbys?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

}
}

stores.clear();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also need to clear storeToChangelogTopic, etc?

// if we cannot get the position of the consumer within timeout, just return false
return false;
} catch (final KafkaException e) {
// this also includes InvalidOffsetException, which should not happen under normal
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, why is it ok to get InvalidOffsetException during restore/poll but not here? When might this get thrown from #position? ditto for in prepareChangelogs down below

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumer.poll throwing InvalidOffsetException should be handled as TaskCorrupted; consumer.position throwing InvalidOffsetException should not happen under normal scenarios, when it happens it indicates a bug and hence we do not need to special handle it.

// a task is only closing / closed when 1) task manager is closing, 2) a rebalance is undergoing;
// in either case we can just log it and move on without notifying the thread since the consumer
// would soon be updated to not return any records for this task anymore.
log.info("Stream task {} is already in {} state, skip adding records to it.", id(), state());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. I think we should actually be concerned if we ever get to here -- I'm not sure the TaskMigratedException made sense either. Trying to add records to a closed task would imply that we closed the task due to shutting down or because we no longer own it, both cases which should also involves trimming its topic(s) from the consumer's assignment, but were still returned records for said topic(s).

Unless, the consumer may still return already-fetched records from partitions no longer in its assignment during poll? I thought we would trim those records out and only return from the actual assignment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's my rationale:

If the task is closed due to rebalance (i.e. we #handleAssignment or #handleLostAll), there might still be some buffered records from the consumer that are returning (since we update the subscription of consumer after), in this case since the subscription would be updated in the next iteration and no records would be returned, it is okay to just skip this once.

If the task is closed due to closing the thread, then there's no need to throw an exception either.

} catch (final TaskMigratedException e) {
log.warn("Detected that the thread is being fenced. " +
"This implies that this thread missed a rebalance and dropped out of the consumer group. " +
"Will migrate out all assigned tasks and rejoin the consumer group.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Will migrate out all assigned tasks and rejoin the consumer group.");
"Will close out all assigned tasks and rejoin the consumer group.");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

@mjsax mjsax added the streams label Feb 7, 2020
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, @guozhangwang ! Comments below.

*
* 1) Under EOS, if the checkpoint file does not contain offsets for corresponding store's changelogs, meaning
* previously it was not close cleanly;
* 2) Out-of-range exception thrown during restoration, meaning that the changelog has been modified and we re-bootstrap
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just now having this thought... Supposing this happens, is it guaranteed to apply to all the stores in the task? I.e., do we really need to re-bootstrap all the stores, or just the one(s) for which our offset is out of range?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can re-bootstrap the state stores only -- this is what we did in the past but that was a lot messier (remember we have to use the optional in fixed-order map? :P). My thoughts are that for non-EOS, the checkpoint file would likely exist so even re-bootstrap the whole task would be okay, for EOS, it is safer to re-bootstrap the whole task.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's certainly safer, but the performance hit seems concerning... restoration i/o is already one of the things people complain about most, and this choice could amplify it multiple times over.

Maybe we can handle it more cleanly by closing all the stores nicely, writing a checkpoint file with the out-of-range stores' checkpoints at 0, and then re-bootstrapping the task, so it only has to restore the broken stores?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds a good idea. Let me try it out.

throw new TaskMigratedException("Restore consumer get fenced by instance-id polling records.", e);
} catch (final InvalidOffsetException e) {
log.warn("Encountered {} fetching records from restore consumer for partitions {}, " +
"marking the corresponding tasks as corrupted.", e.getClass().getName(), e.partitions());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it wouldn't hurt to explain what the exception means (our position is too old and has been deleted or compacted by the broker) and what we hope to accomplish by marking the task as corrupted (to re-bootstrap the stores from the changelog and return to normal processing).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

changelogReader.remove(task.changelogPartitions());
}

for (final TopicPartition inputPartition : inputPartitions) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent was to remove the input partitions from the map any time we remove a task from tasks. It looks like your code maintains this (in a clearer and cleaner way).

@guozhangwang
Copy link
Copy Markdown
Contributor Author

retest this please

Copy link
Copy Markdown
Contributor Author

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei I made the change and it's a bit more complicated than I thought, the current proposal is the smallest change that I can make (it still added new APIs into Task interface). LMK WDYT.

public void markChangelogAsCorrupted(final Set<TopicPartition> partitions) {
stateMgr.markChangelogAsCorrupted(partitions);

// only write a new checkpoint (excluding the corrupted partitions) if eos is disabled
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can checkpoint (excluding the corrupted partitions) for 1) standby tasks and 2) non-eos active tasks, for eos active tasks we should not write the checkpoint since for eos we HAVE TO reboot every store from scratch to maintain consistency.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang , I had a few nits, but the main concern is about the skipping-adding-records thing.

Otherwise, this change looks great to me!

Comment on lines -709 to -727
if (state() == State.CLOSED || state() == State.CLOSING) {
log.info("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", id());
throw new TaskMigratedException(id());
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we still skip adding records? It looks like that was the intent, but I think what actually would happen is that we'd still add the records, but skip processing them.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional: as in this PR #8091 where we fixed the committing offset we would look into the buffered record so that we can get the correct "next" offset to commit, if we skip adding records here when the task is closing we would return incorrect results potentially.

guozhangwang and others added 7 commits February 18, 2020 16:00
…nals/StreamTask.java

Co-Authored-By: John Roesler <vvcephei@users.noreply.github.com>
…nals/StreamThread.java

Co-Authored-By: John Roesler <vvcephei@users.noreply.github.com>
…nals/StreamThread.java

Co-Authored-By: John Roesler <vvcephei@users.noreply.github.com>
Copy link
Copy Markdown
Contributor Author

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the fix for KAFKA-9572

if (state() == State.CREATED || state() == State.CLOSING || state() == State.SUSPENDED) {
// do nothing
log.trace("Skip suspending since state is {}", state());
} else if (state() == State.RUNNING) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the attempted fix of https://issues.apache.org/jira/browse/KAFKA-9572: if we are closing / suspending a restoring task, we should only update the checkpoint file but should NOT commit offsets, since the committed offsets indicate the "restore end" and should not be updated, cc @cadonna who filed the JIRA.

}

transitionTo(State.CLOSING);
} else if (state() == State.RESTORING) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is part of the fix as well: only flushing / checkpointing, but not committing.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang !

@guozhangwang
Copy link
Copy Markdown
Contributor Author

The local run in JDK8 / Scala 2.12 passed. Will merge to trunk now.

@guozhangwang guozhangwang merged commit 3b6573c into apache:trunk Feb 21, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 24, 2020
* apache-github/trunk: (23 commits)
  KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (apache#8154)
  HOTFIX: fix NPE in Kafka Streams IQ (apache#8158)
  MINOR: set scala version automatically based on gradle.properties
  KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (apache#8142)
  KAFKA-9441: Add internal TransactionManager (apache#8105)
  MINOR: Document endpoints for connector topic tracking (KIP-558)
  MINOR: Standby task commit needed when offsets updated (apache#8146)
  KAFKA-9206; Throw KafkaException on CORRUPT_MESSAGE error in Fetch response (apache#8111)
  MINOR: Remove unwanted regexReplace on tests/kafkatest/__init__.py
  KAFKA-9586: Fix errored json filename in ops documentation
  KAFKA-9575: Mention ZooKeeper 3.5.7 upgrade
  KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (apache#8058)
  HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed (apache#8140)
  MINOR: Fix javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion (apache#7337)
  MINOR: Improve EOS example exception handling (apache#8052)
  MINOR: Fix a number of warnings in clients test (apache#8073)
  MINOR: Update shell scripts to support z/OS system (apache#7913)
  MINOR: Wording fix in Streams DSL docs (apache#5692)
  MINOR: Add missing @test annotation to MetadataTest#testMetadataMerge (apache#8141)
  KAFKA-9533: ValueTransform forwards `null` values (apache#8108)
  ...
ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 24, 2020
…etrics-common

* confluent/master: (76 commits)
  KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (apache#8154)
  HOTFIX: fix NPE in Kafka Streams IQ (apache#8158)
  MINOR: set scala version automatically based on gradle.properties
  KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (apache#8142)
  KAFKA-9441: Add internal TransactionManager (apache#8105)
  MINOR: Document endpoints for connector topic tracking (KIP-558)
  MINOR: Standby task commit needed when offsets updated (apache#8146)
  Changes to migrate to Artifactory (#263)
  KAFKA-9206; Throw KafkaException on CORRUPT_MESSAGE error in Fetch response (apache#8111)
  MINOR: Remove unwanted regexReplace on tests/kafkatest/__init__.py
  KAFKA-9586: Fix errored json filename in ops documentation
  KAFKA-9575: Mention ZooKeeper 3.5.7 upgrade
  KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (apache#8058)
  HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed (apache#8140)
  MINOR: Fix javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion (apache#7337)
  MINOR: Improve EOS example exception handling (apache#8052)
  MINOR: Fix a number of warnings in clients test (apache#8073)
  MINOR: Update shell scripts to support z/OS system (apache#7913)
  MINOR: Wording fix in Streams DSL docs (apache#5692)
  MINOR: Add missing @test annotation to MetadataTest#testMetadataMerge (apache#8141)
  ...
@guozhangwang guozhangwang deleted the KMinor-invalid-offset-changelog-reader branch April 24, 2020 23:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants