KAFKA-10167: use the admin client to read end-offset#8876
KAFKA-10167: use the admin client to read end-offset#8876guozhangwang merged 11 commits intoapache:trunkfrom
Conversation
| try { | ||
| return restoreConsumer.endOffsets(partitions); | ||
| } catch (final TimeoutException e) { | ||
| if (adminClient != null) { |
There was a problem hiding this comment.
Why do we need this distinction? Seems we set adminClient in any case and it should never be null?
There was a problem hiding this comment.
I also do not understand the distinction. When would we want to call restoreConsumer.endOffsets(partitions)?
There was a problem hiding this comment.
req: Could you add a test (or adapt an existing one) and verify whether during restore() a call to adminClient.listOffsets() with isolation level READ_UNCOMMITTED is done?
There was a problem hiding this comment.
This is just a hack-around: I will always use admin-client by passing it via the constructor.
I will add a unit test.
There was a problem hiding this comment.
I think we can remove this null check? If you really want to add one, add it to the constructor?
this.adminClient = Objects.notNull(adminClient);
There was a problem hiding this comment.
I just saw that we kinda need to pass null as AdminClient in TDD -- however, StoreChangelogReader#restore() is never called by TTD, and thus I still think we should remove the null check and fail hard with a NPE as it would indicate a bug if adminClient is null in a regular deployment.
| final ConsumerRebalanceListener rebalanceListener; | ||
| final Consumer<byte[], byte[]> mainConsumer; | ||
| final Consumer<byte[], byte[]> restoreConsumer; | ||
| final Admin adminClient; |
There was a problem hiding this comment.
Why is adminClient not final any longer? We still pass it into the StreamThread constructor.
There was a problem hiding this comment.
You mean private, right? It is not private anymore.
Instead of making the adminClient field package private for testing, I would either add a setter setAdmin() to MockClientSupplier or instantiate the admin in a private field of the MockClientSupplier and use the existing getter to set up the admin, or instantiate the admin in a public field of the MockClientSupplier and accessing it directly to set it up (similarly to the producer and consumer).
There was a problem hiding this comment.
Yeah I agree, will move all these variables to private and add getters.
There was a problem hiding this comment.
@guozhangwang Thanks very much for this!
This bug was driving me crazy! 😃
| final ConsumerRebalanceListener rebalanceListener; | ||
| final Consumer<byte[], byte[]> mainConsumer; | ||
| final Consumer<byte[], byte[]> restoreConsumer; | ||
| final Admin adminClient; |
There was a problem hiding this comment.
You mean private, right? It is not private anymore.
Instead of making the adminClient field package private for testing, I would either add a setter setAdmin() to MockClientSupplier or instantiate the admin in a private field of the MockClientSupplier and use the existing getter to set up the admin, or instantiate the admin in a public field of the MockClientSupplier and accessing it directly to set it up (similarly to the producer and consumer).
|
|
||
| final Consumer<byte[], byte[]> mainConsumer = clientSupplier.getConsumer(consumerConfigs); | ||
| changelogReader.setMainConsumer(mainConsumer); | ||
| changelogReader.setAdminClient(adminClient); |
There was a problem hiding this comment.
Q: Why do you not pass the admin client in the constructor?
There was a problem hiding this comment.
Yeah that's the plan, I was just hacking it around so that I do not need to change 30+ unit tests. Once it is confirmed to fix the issue I will refactor this PR.
| try { | ||
| return restoreConsumer.endOffsets(partitions); | ||
| } catch (final TimeoutException e) { | ||
| if (adminClient != null) { |
There was a problem hiding this comment.
I also do not understand the distinction. When would we want to call restoreConsumer.endOffsets(partitions)?
| try { | ||
| return restoreConsumer.endOffsets(partitions); | ||
| } catch (final TimeoutException e) { | ||
| if (adminClient != null) { |
There was a problem hiding this comment.
req: Could you add a test (or adapt an existing one) and verify whether during restore() a call to adminClient.listOffsets() with isolation level READ_UNCOMMITTED is done?
| final StreamThread thread = createStreamThread("clientId", config, false); | ||
| final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer; | ||
| final MockConsumer<byte[], byte[]> mockRestoreConsumer = (MockConsumer<byte[], byte[]>) thread.restoreConsumer; | ||
| final MockAdminClient mockAdminClient = (MockAdminClient) thread.adminClient; |
There was a problem hiding this comment.
See my comment in StreamThread.
| this.mainConsumer = consumer; | ||
| } | ||
|
|
||
| void setAdminClient(final Admin adminClient) { |
…-offsets-uncommitted
…-offsets-uncommitted
| return result.all().get().entrySet().stream().collect( | ||
| Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().offset())); | ||
| } else { | ||
| // we only fall back to use restore consumer if admin client is not set in TTD |
There was a problem hiding this comment.
Can this case ever apply? TTD should never restore any task from my understanding.
There was a problem hiding this comment.
TTD would still call this function though... after some thinking I've decided to add a mock-admin to streams:test-utils (the existing mock is in clients:test and hence cannot be relied on).
| } catch (final TimeoutException e) { | ||
| if (adminClient != null) { | ||
| final ListOffsetsResult result = adminClient.listOffsets(partitions.stream().collect( | ||
| Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))); |
There was a problem hiding this comment.
Should we set isolation.level explicitly? -- In case the default if ever changed, we would have a safe guard?
There was a problem hiding this comment.
Good point, will do.
| try { | ||
| return restoreConsumer.endOffsets(partitions); | ||
| } catch (final TimeoutException e) { | ||
| final ListOffsetsResult result = adminClient.listOffsets( |
There was a problem hiding this comment.
Did you check out the new methods in ClientUtils?
|
@guozhangwang Lot of failing test that seem related? |
Hmm. My local tests passed twice.. I will double check. |
| mockWallClockTime, | ||
| streamsConfig, | ||
| logContext, | ||
| createAdminClient(processorTopology.storeToChangelogTopic()), |
There was a problem hiding this comment.
This is the only thing I am wondering about. Why not just pass null?
There was a problem hiding this comment.
After thinking a bit more I agree with you now: I removed the mock admin client and instead just pass a mock register (since the register function may still be called)
mjsax
left a comment
There was a problem hiding this comment.
Thanks @guozhangwang. LGTM. Free free to merge after Jenkins passed.
Since admin client allows use to use flexible offset-spec, we can always set to use read-uncommitted regardless of the EOS config. Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Bruno Cadonna <bruno@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
|
Cherry-pick to 2.6. |
* 'trunk' of github.com:apache/kafka: KAFKA-10168: fix StreamsConfig parameter name variable (apache#8865) MINOR: code cleanup for inconsistent naming (apache#8871) KAFKA-10138: Prefer --bootstrap-server for reassign_partitions command in ducktape tests (apache#8898) KAFKA-10185: Restoration info logging (apache#8896) KAFKA-9891: add integration tests for EOS and StandbyTask (apache#8890) MINOR: Reduce build time by gating test coverage plugins behind a flag (apache#8899) KAFKA-10141; Add more detail to log segment delete messages (apache#8850) KAFKA-10113; Specify fetch offsets correctly in `LogTruncationException` (apache#8822) KAFKA-10167: use the admin client to read end-offset (apache#8876) MINOR: Upgrade ducktape to 0.7.8 (apache#8879) KAFKA-10123; Fix incorrect value for AWAIT_RESET#hasPosition (apache#8841) KAFKA-9896: fix flaky StandbyTaskEOSIntegrationTest (apache#8883) MINOR: clean up unused checkstyle suppressions for Streams (apache#8861) MINOR: reuse toConfigObject(Map) to generate Config (apache#8889) MINOR: Upgrade jetty to 9.4.27.v20200227 and jersey to 2.31 (apache#8859) MINOR: Fix flaky HighAvailabilityTaskAssignorIntegrationTest (apache#8884) KAFKA-10147 MockAdminClient#describeConfigs(Collection<ConfigResource>) is unable to handle broker resource (apache#8853) KAFKA-10165: Remove Percentiles from e2e metrics (apache#8882) # Conflicts: # core/src/main/scala/kafka/log/Log.scala
Since admin client allows use to use flexible offset-spec, we can always set to use read-uncommitted regardless of the eos config.
Committer Checklist (excluded from commit message)