MINOR: Improve log4j for per-consumer assignment#8997
MINOR: Improve log4j for per-consumer assignment#8997guozhangwang merged 9 commits intoapache:trunkfrom
Conversation
guozhangwang
left a comment
There was a problem hiding this comment.
Ready for review.
|
|
||
| log.info("Assigned tasks to clients as {}{}.", | ||
| Utils.NL, clientStates.entrySet().stream().map(Map.Entry::toString).collect(Collectors.joining(Utils.NL))); | ||
| log.info("Assigned tasks to clients as: {}{}.", Utils.NL, |
There was a problem hiding this comment.
This is unnecessarily verbose, plus part of it is replaced by line 960 below, so I trimmed a bit here.
There was a problem hiding this comment.
Thanks, this one has always made my head spin when reading the logs
ableegoldman
left a comment
There was a problem hiding this comment.
Seems reasonable -- I'm always a little hesitant to add yet more task sets/map to the already cluttered ClientState class, but maybe we can do some follow-on cleanup work there. Since this is only being used for debugging at the moment I'm not too concerned
|
|
||
| log.info("Assigned tasks to clients as {}{}.", | ||
| Utils.NL, clientStates.entrySet().stream().map(Map.Entry::toString).collect(Collectors.joining(Utils.NL))); | ||
| log.info("Assigned tasks to clients as: {}{}.", Utils.NL, |
There was a problem hiding this comment.
Thanks, this one has always made my head spin when reading the logs
|
|
||
| // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance | ||
| // Arbitrarily choose the leader's client to be responsible for triggering the probing rebalance, | ||
| // note once we pick the first consumer of the processor to trigger probing rebalance, other threads |
There was a problem hiding this comment.
super nit: processor --> process (or bet yet, 'client')
ableegoldman
left a comment
There was a problem hiding this comment.
@guozhangwang I'd be fine with just merging as is and kicking off a new soak in the hopes we can reproduce this and get the exact initial conditions to then reproduce it locally.
That said, I'd be +1 on adding logs to also explicitly point out the stateless and stateful tasks. But we can always figure that out in other ways so it's only more of a nice-to-have than totally necessary
Good point. I will add this information too. |
…mprove-assignor-log4j
|
Thanks @guozhangwang , LGTM |
abbccdda
left a comment
There was a problem hiding this comment.
I'm slightly against merging a PR which lacks test coverage, normally we would never look back. For soak deployment, I believe we could just patch this PR without waiting for it to merge.
guozhangwang
left a comment
There was a problem hiding this comment.
@abbccdda That's a good point. Originally I did not add any unit tests since the PR only involves log4j improvements, but on a second thought it did added and modified some data structures in order to do so so they should be covered.
I've added the unit tests accordingly.
| } | ||
|
|
||
| log.info("Client {} per-consumer assignment:\n" + | ||
| "\tprev owned active {}\n" + |
| prevActiveTasks = new TreeSet<>(); | ||
| prevStandbyTasks = new TreeSet<>(); | ||
| consumerToPreviousStatefulTaskIds = new TreeMap<>(); | ||
| consumerToPreviousActiveTaskIds = new TreeMap<>(); |
There was a problem hiding this comment.
nit: we could initialize on the parameter definition.
| private final Map<String, Set<TaskId>> consumerToPreviousActiveTaskIds; | ||
| private final Map<String, Set<TaskId>> consumerToAssignedActiveTaskIds; | ||
| private final Map<String, Set<TaskId>> consumerToAssignedStandbyTaskIds; | ||
| private final Map<String, Set<TaskId>> consumerToRevokingActiveTaskIds; |
There was a problem hiding this comment.
We should add an empty line here to avoid mixing with the other maps with production use cases.
|
|
||
| private final Map<String, Set<TaskId>> consumerToPreviousStatefulTaskIds; | ||
| // the following four maps are used only for logging purposes; | ||
| // TODO: we could consider merging them with other book-keeping maps |
There was a problem hiding this comment.
Could we add a follow-up ticket instead? Someone in the community could pick it up.
…mprove-assigt nor-log4j
|
@abbccdda updated per your comments. |
abbccdda
left a comment
There was a problem hiding this comment.
LGTM, thanks for the enhanced logging!
* MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server (apache#9030) Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, David Jacot <djacot@confluent.io>, Ron Dagostino <rdagostino@confluent.io> * MINOR: Fix flaky system test assertion after static member fencing (apache#9033) The test case `OffsetValidationTest.test_fencing_static_consumer` fails periodically due to this error: ``` Traceback (most recent call last): File "/home/jenkins/workspace/system-test-kafka_2.6/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 134, in run data = self.run_test() File "/home/jenkins/workspace/system-test-kafka_2.6/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/tests/runner_client.py", line 192, in run_test return self.test_context.function(self.test) File "/home/jenkins/workspace/system-test-kafka_2.6/kafka/venv/lib/python2.7/site-packages/ducktape-0.7.8-py2.7.egg/ducktape/mark/_mark.py", line 429, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/jenkins/workspace/system-test-kafka_2.6/kafka/tests/kafkatest/tests/client/consumer_test.py", line 257, in test_fencing_static_consumer assert len(consumer.dead_nodes()) == num_conflict_consumers AssertionError ``` When a consumer stops, there is some latency between when the shutdown is observed by the service and when the node is added to the dead nodes. This patch fixes the problem by giving some time for the assertion to be satisfied. Reviewers: Boyang Chen <boyang@confluent.io> * MINOR: Improve log4j for per-consumer assignment (apache#8997) Add log4j entry summarizing the assignment (previous owned and assigned) at the consumer level. Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Boyang Chen <boyang@confluent.io> * KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers (apache#8979) Brokers currently return NOT_LEADER_FOR_PARTITION to producers and REPLICA_NOT_AVAILABLE to consumers if a replica is not available on the broker during reassignments. Non-Java clients treat REPLICA_NOT_AVAILABLE as a non-retriable exception, Java consumers handle this error by explicitly matching the error code even though it is not an InvalidMetadataException. This PR renames NOT_LEADER_FOR_PARTITION to NOT_LEADER_OR_FOLLOWER and uses the same error for producers and consumers. This is compatible with both Java and non-Java clients since all clients handle this error code (6) as retriable exception. The PR also makes ReplicaNotAvailableException a subclass of InvalidMetadataException. - ALTER_REPLICA_LOG_DIRS continues to return REPLICA_NOT_AVAILABLE. Retained this for compatibility since this request never returned NOT_LEADER_FOR_PARTITION earlier. - MetadataRequest version 0 also returns REPLICA_NOT_AVAILABLE as topic-level error code for compatibility. Newer versions filter these out and return Errors.NONE, so didn't change this. - Partition responses in MetadataRequest return REPLICA_NOT_AVAILABLE to indicate that one of the replicas is not available. Did not change this since NOT_LEADER_FOR_PARTITION is not suitable in this case. Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>, Bob Barrett <bob.barrett@confluent.io> * MINOR: Improved code quality for various files. (apache#9037) Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com> * MINOR: Fixed some resource leaks. (apache#8922) Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com> * MINOR: Enable broker/client compatibility tests for 2.5.0 release - Add missing broker/client compatibility tests for 2.5.0 release Author: Manikumar Reddy <manikumar.reddy@gmail.com> Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com> Closes apache#9041 from omkreddy/compat * KAFKA-10286: Connect system tests should wait for workers to join group (apache#9040) Currently, the system tests `connect_distributed_test` and `connect_rest_test` only wait for the REST api to come up. The startup of the worker includes an asynchronous process for joining the worker group and syncing with other workers. There are some situations in which this sync takes an unusually long time, and the test continues without all workers up. This leads to flakey test failures, as worker joins are not given sufficient time to timeout and retry without waiting explicitly. This changes the `ConnectDistributedTest` to wait for the Joined group message to be printed to the logs before continuing with tests. I've activated this behavior by default, as it's a superset of the checks that were performed by default before. This log message is present in every version of DistributedHerder that I could find, in slightly different forms, but always with `Joined group` at the beginning of the log message. This change should be safe to backport to any branch. Signed-off-by: Greg Harris <gregh@confluent.io> Author: Greg Harris <gregh@confluent.io> Reviewer: Randall Hauch <rhauch@gmail.com> * KAFKA-10295: Wait for connector recovery in test_bounce (apache#9043) Signed-off-by: Greg Harris <gregh@confluent.io> Co-authored-by: Rajini Sivaram <rajinisivaram@googlemail.com> Co-authored-by: Jason Gustafson <jason@confluent.io> Co-authored-by: Guozhang Wang <wangguoz@gmail.com> Co-authored-by: Leonard Ge <62600326+leonardge@users.noreply.github.com> Co-authored-by: Manikumar Reddy <manikumar.reddy@gmail.com> Co-authored-by: Greg Harris <gregh@confluent.io>
Add log4j entry summarizing the assignment (prev owned and assigned) at the consumer level.
Committer Checklist (excluded from commit message)