Skip to content

KAFKA-7737; Use single path in producer for initializing the producerId#7920

Merged
hachikuji merged 3 commits intoapache:trunkfrom
hachikuji:KAFKA-7737
Jan 23, 2020
Merged

KAFKA-7737; Use single path in producer for initializing the producerId#7920
hachikuji merged 3 commits intoapache:trunkfrom
hachikuji:KAFKA-7737

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

Previously the idempotent producer and transactional producer use separate logic when initializing the producerId. This patch consolidates the two paths. We also do some cleanup in TransactionManagerTest to eliminate brittle expectations on Sender.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@hachikuji
Copy link
Copy Markdown
Contributor Author

retest this please

Copy link
Copy Markdown
Contributor

@viktorsomogyi viktorsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good as far as I can tell. Saw that you had 2 test issues but most likely those were flakies, I reran the client and core tests for this patch on my laptop and they didn't fail, although let's rerun them in the hope of a green build.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just sanity checking: do I understand correctly that this became the part of resetProducerIdIfNeeded so at the first time this code path runs the producer id gets placed in the transactional request queue? Is this also the reason for removing 'maybeWaitForProducerId`?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you consider using a long timeout instead and move this to TestUtils? Perhaps we need this somewhere else too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit difficult to restructure these test cases with a timeout. Most of the logic is basically "run the sender until X happens, then verify Y." With a timeout, we cannot control the stopping condition.

@viktorsomogyi
Copy link
Copy Markdown
Contributor

Ah you reran them in the meanwhile :)
Now only one failed:
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup for which we have a submitted issue (KAFKA-7965)

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just had one qq about the PR, will review the remaining of the PR once that's cleared -- maybe I missed something big here so need to clarify.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only other caller of InitProducerIdRequest.Builder is in TransactionManager#initializeTransactions which is only called in producer#iniTxns. For idempotent producer how that initPid request would be sent? Did I miss anything?

Copy link
Copy Markdown
Contributor Author

@hachikuji hachikuji Jan 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also build an InitProducerIdRequest inside TransactionManager.resetProducerIdIfNeeded in this patch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: is the second condition necessary given the first one? If we are not in transactional, then right now the only place to transit to INITIALIZING is line 496 below. I'm actually fine to leave it here to be less bug-vulnerable, but just to clarify my understanding.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it to ensure that we don't enqueue multiple InitProducerId requests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: could you add a comment explaining why we need to call client.poll before exiting under this condition?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a great explanation for it to be honest. I guess we are trying to ensure that any inflight requests return before we shutdown. Perhaps we should just crash the thread and let the client get cleaned up? How about I open a separate issue for this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can let this to return a boolean on whether a request is enqueued, and then in Sender we could:

if (transactionManager.resetProducerIdIfNeeded() && maybeSendAndPollTransactionalRequest()) ..

also nit: Add a comment on top indicating that we will send out a new initPid request if producer is not transactional.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to call maybeSendAndPollTransactionalRequest in this loop regardless whether resetProducerIdNeeded returns true. Ack on the comment.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: is that a piggy-backed fix, or is it necessary for the refactoring?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of an optimization than a fix I guess, but it simplified one of the tests in TransactionManagerTest.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about: letting the state to transit to UNINITIALIZED inside resetProducerId (since it will only execute successfully if not transactional). By doing this we still have a single transition path to INITIALIZING?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's a good thought. I don't recall if I previously considered this, but let me take a look and see if it works.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: Why is this necessary?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we blackout the node for 10ms above. Previously we were relying on the backoff logic in maybeWaitForProducerId for the node to be ready again. Now the test needs time to be advanced externally since we cannot rely on client.poll advancing it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SG.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reducing visibility here to make sure that test cases are forced through the proper state transitions.

@hachikuji
Copy link
Copy Markdown
Contributor Author

retest this please

1 similar comment
@hachikuji
Copy link
Copy Markdown
Contributor Author

retest this please

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Copy Markdown
Contributor

@bob-barrett bob-barrett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI, for KIP-360 I'm doing this check for both idempotent and transactional, since it triggers an epoch bump instead of a producerId reset. I'll just pull this call out to a shared code path, the rest of this method shouldn't need to change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We can rename the method as well.

@hachikuji
Copy link
Copy Markdown
Contributor Author

retest this please

@hachikuji hachikuji merged commit df13fc9 into apache:trunk Jan 23, 2020
ijuma added a commit to confluentinc/kafka that referenced this pull request Jan 25, 2020
Conflicts and/or compiler errors due to the fact that we
temporarily reverted the commit that removes
Scala 2.11 support:

* Exit.scala: replace SAMs with anonymous inner classes.
* MiniKdc.scala: take upstream changes.

# By A. Sophie Blee-Goldman (1) and others
# Via Jason Gustafson
* apache-github/trunk:
  KAFKA-9254; Overridden topic configs are reset after dynamic default change (apache#7870)
  MINOR: MiniKdc JVM shutdown hook fix (apache#7946)
  KAFKA-9152; Improve Sensor Retrieval (apache#7928)
  Correct exception message in DistributedHerder (apache#7995)
  KAFKA-7317: Use collections subscription for main consumer to reduce metadata (apache#7969)
  KAFKA-9181; Maintain clean separation between local and group subscriptions in consumer's SubscriptionState (apache#7941)
  KAFKA-7737; Use single path in producer for initializing the producerId (apache#7920)

# Conflicts:
#	core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants