Skip to content

KAFKA-9113: Clean up task management and state management#7997

Merged
guozhangwang merged 194 commits intoapache:trunkfrom
guozhangwang:k9113-base
Feb 5, 2020
Merged

KAFKA-9113: Clean up task management and state management#7997
guozhangwang merged 194 commits intoapache:trunkfrom
guozhangwang:k9113-base

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang commented Jan 22, 2020

This PR is collaborated by Guozhang Wang and John Roesler. It is a significant tech debt cleanup on task management and state management, and is broken down by several sub-tasks listed below:

  1. Extract embedded clients (producer and consumer) into RecordCollector from StreamTask.
    KAFKA-9113, P2: Extract Producer to RecordCollector guozhangwang/kafka#2
    KAFKA-9113: Unit test for RecordCollector guozhangwang/kafka#5

  2. Consolidate the standby updating and active restoring logic into ChangelogReader and extract out of StreamThread.
    KAFKA-9113, P3: ProcessorStateManager and ChangelogReader guozhangwang/kafka#3
    KAFKA-9113: Unit test for ProcessorStateManager and ChangelogReader guozhangwang/kafka#4

  3. Introduce Task state life cycle (created, restoring, running, suspended, closing), and refactor the task operations based on the current state.
    KAFKA-9113: P4, Refactor Task Lifecycle guozhangwang/kafka#6
    KAFKA-9113: P5, StandbyTask Life Cycle guozhangwang/kafka#7

  4. Consolidate AssignedTasks into TaskManager and simplify the logic of changelog management and task management (since they are already moved in step 2) and 3)).
    Fix TaskManagerTest guozhangwang/kafka#8
    KAFKA-9113: StreamTask unit test guozhangwang/kafka#9

Also simplified the StreamThread logic a bit as the embedded clients / changelog restoration logic has been moved into step 1) and 2).
guozhangwang#10

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@vvcephei This PR is ready for final reviews.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

retest this please

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang , Here's all the comments I have so far. I'm still in progress on the review, but figured I could submit these.

log.debug("State store {} initialized from checkpoint with offset {} at changelog {}",
store.stateStore.name(), store.offset, store.changelogPartition);
} else {
// TODO K9113: for EOS when there's no checkpointed offset, we should treat it as TaskCorrupted
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unresolved TODO here. it's unclear to me why we would only consider this task corrupted in EOS mode. It seems like the lack of a checkpoint file just means that we loaded a cached task in an undefined state, and we should discard it and restore. If we have the file, but some of a changelog is missing from it, then maybe it just never got written, before shutdown, or more likely, the topology has changed and the task is corrupted, whether or not we are in EOS.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today we write the checkpoint file before we commit the offsets, so under non-EOS we can just restore from scratch without wiping the local store image since restoring is just overwriting the local store and we can just restore to the end of the changelog (this may result in duplicates but is fine under non-EOS); but with non-EOS we have to first wipe out before restoring from scratch.

}

if (!loadedCheckpoints.isEmpty()) {
log.warn("Some loaded checkpoint offsets cannot find their corresponding state stores: {}", loadedCheckpoints);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like this also indicates the task is corrupted

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arguably yes, I'm just following the old logic intentionally here -- I think originally we want to be more compatible with topology changes (if some topology optimizations decides that some stores are not needed) but on second thought I think this is not safe either. We can consider making this change in another PR to make it stricter.

log.warn("Some loaded checkpoint offsets cannot find their corresponding state stores: {}", loadedCheckpoints);
}

checkpointFile.delete();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in retrospect, I like the idea of doing this regardless of EOS. Why should we deliberately produce wrong results in ALO mode? We can certainly optimize to be able to use semi-trustworthy data, but let's treat that separately from EOS vs ALO

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that for ALO if we failed to write the first checkpoint after restarting we can still fallback to the original checkpoint even though the store may have been updated and hence there would be duplicates. But since the window gap (just one commit interval) is so small I think it does not worth making the code more complicated with EO v.s. ALO.

return taskId;
}

// used by the changelog reader only
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these kinds of comments paradoxically lead to unmaintainability. Either a method is part of the public contract or not. If it is, then this comment would become out of date, if not, then the changelog reader shouldn't be using it. The recommendation is simply to delete the comment (and similar ones). This even applies to "for testing" comments. I have found several methods in use in this code base that were commented "visible for testing". Even for tests, either move both the class and the test into an isolated package and use package-private, refactor the test, or remove the comment. Also, if the changelog reader really needs four "holes" poked into this class, then we should reconsider the relationship between the state manager and the changelog reader.

private static Map<TopicPartition, Long> validCheckpointableOffsets(
final Map<TopicPartition, Long> checkpointableOffsets,
final Set<TopicPartition> validCheckpointableTopics) {
private StateStoreMetadata findStore(final TopicPartition changelogPartition) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it doesn't matter, but this method makes the only usage an n^2 algorithm. If we instead inverted the stores collection and used it for lookups in register, it would be o(n)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about it: inverting the stores collection makes other calls that depends on the store name more complicated, while keeping two collections indexed by storeName / changelog partition is not much worthy since within a task there are usually no more than 10 stores so this n^2 algorithm should not be a big deal.

…nals/TaskManager.java

Co-Authored-By: John Roesler <vvcephei@users.noreply.github.com>
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
@Mock(type = MockType.NICE)
private ChangelogReader changelogReader;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unused

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

private ProcessorStateManager stateManager;

@Mock(type = MockType.NICE)
private RecordCollector recordCollector;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is also unused


@Ignore
@Test
// FIXME: should unblock this test after we added invalid offset handling
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you want to fix this as part of this PR or as a follow-on?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a follow-up.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang , I've completed my final review pass. I left a few comments earlier, but nothing that would stop me from merging this.

Thanks for driving this!

@guozhangwang
Copy link
Copy Markdown
Contributor Author

if (generation() != Generation.NO_GENERATION) {
e = invokePartitionsRevoked(droppedPartitions);
} else {
if (generation() == Generation.NO_GENERATION || rebalanceInProgress()) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman @mjsax This is another bug I found while trouble-shooting the system test failures (dates before the cleanup): when we got a task-migrated exception, and then enforce a rebalance, we call unsubscribe which would trigger onLeavePrepare, here if it was from task-migrated then it is likely that we are already undergoing a rebalance and in that case we should lose the tasks instead of revoke them since otherwise, we would still try to commit which would fail with a RebalanceInProgress exception.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Also it's pretty unfortunate that we can only trigger a rebalance from outside the client by unsubscribing and closing/suspending the entire assignment...this limits the usefulness of KIP-429 during version probing upgrades.

It also has some implications for the "rebalances are cheap" assumption of KIP-441. Would be better phrased as "rebalances are cheap, except for the member who triggers them".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ... maybe we should consider adding a new API to consumer to rejoin the group, in a cheaper way.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree :) -- would be happy to write up a small KIP for it and kick off discussion

@guozhangwang
Copy link
Copy Markdown
Contributor Author

https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3732/console succeed, merging to trunk now.

@guozhangwang guozhangwang merged commit 4090f9a into apache:trunk Feb 5, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Feb 7, 2020
Conflicts:
* build.gradle: moved avro plugin definition below
newly added test retry plugin.

* apache-github/trunk:
  MINOR: further InternalTopologyBuilder cleanup  (apache#8046)
  MINOR: Add timer for update limit offsets (apache#8047)
  HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051)
  KAFKA-9447: Add new customized EOS model example (apache#8031)
  KAFKA-8164: Add support for retrying failed (apache#8019)
  HOTFIX: checkstyle for newly added unit test
  KAFKA-9261; Client should handle unavailable leader metadata (apache#7770)
  MINOR: Fix typos introduced in KIP-559 (apache#8042)
  MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679)
  KAFKA-9113: Clean up task management and state management (apache#7997)
  MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038)
  KAFKA-9491; Increment high watermark after full log truncation (apache#8037)
  KAFKA-9477 Document RoundRobinAssignor as an option for partition.assignment.strategy (apache#8007)
  KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time and timestamp literal (apache#7568)
  KAFKA-9492; Ignore record errors in ProduceResponse for older versions (apache#8030)
@guozhangwang guozhangwang deleted the k9113-base branch April 24, 2020 23:59
ijuma added a commit to ijuma/kafka that referenced this pull request Apr 28, 2020
…t-for-generated-requests

* apache-github/trunk: (410 commits)
  KAFKA-8843: KIP-515: Zookeeper TLS support
  MINOR: Add missing quote for malformed line content (apache#8070)
  MINOR: Simplify KafkaProducerTest (apache#8044)
  KAFKA-9507; AdminClient should check for missing committed offsets (apache#8057)
  KAFKA-9519: Deprecate the --zookeeper flag in ConfigCommand (apache#8056)
  KAFKA-9509; Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication (apache#8048)
  HOTFIX: Fix two test failures in JDK11 (apache#8063)
  DOCS - clarify transactionalID and idempotent behavior (apache#7821)
  MINOR: further InternalTopologyBuilder cleanup  (apache#8046)
  MINOR: Add timer for update limit offsets (apache#8047)
  HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051)
  KAFKA-9447: Add new customized EOS model example (apache#8031)
  KAFKA-8164: Add support for retrying failed (apache#8019)
  HOTFIX: checkstyle for newly added unit test
  KAFKA-9261; Client should handle unavailable leader metadata (apache#7770)
  MINOR: Fix typos introduced in KIP-559 (apache#8042)
  MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679)
  KAFKA-9113: Clean up task management and state management (apache#7997)
  MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038)
  KAFKA-9491; Increment high watermark after full log truncation (apache#8037)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants