Skip to content

KAFKA-7916: Unify store wrapping code for clarity#6255

Merged
guozhangwang merged 13 commits intoapache:trunkfrom
vvcephei:wrapper-unification
Feb 14, 2019
Merged

KAFKA-7916: Unify store wrapping code for clarity#6255
guozhangwang merged 13 commits intoapache:trunkfrom
vvcephei:wrapper-unification

Conversation

@vvcephei
Copy link
Copy Markdown
Contributor

Refactor internal store wrapping for improved maintainability.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang @ableegoldman @bbejeck @mjsax ,

Please take a look at this. It's meant to be the first in a series of rationalizations of the internal state store handling logic.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move reasoning about whether a store is new or old binary format inside of the store hierarchy, to preserve encapsulation. Ideally, isTimestamped would be an instance method, but we don't need to add a method to the public StateStore interface. A static method is good enough.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These no-longer-necessary methods demonstrate the benefit of adding the generic parameter to WrappedStateStore. Previously, wrappedStore could only return a StateStore, but now it directly returns a T extends StateStore, with no need for casting anywhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll note in all these methods that we need to say super.*** to explicitly invoke the operation on the wrapped store when it's a method that the subclass explicitly overrides.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the other way that the generic parameter on WrappedStateStore manifests... Now that wrappedStore() returns a properly typed store, we no longer need to store a redundant reference to the wrapped store just to preserve the type information.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cache the functions so we don't have to lean on the compiler's decision to cache it and avoid creating a new function every invocation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed the function to be a bit more explicit. It's internal, so the name shouldn't affect the public API at all.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rawValueToTimestampedValueRecordConvert() -- without RecordConverter suffix, it's unclear what it actually returns, a RecordConverter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we doing Hungarian Notation now? https://en.wikipedia.org/wiki/Hungarian_notation ;)

Some might argue that it's unnecessary to encode the type in a name in a strongly (and explicitly) typed language like Java.

If you feel strongly about it, I'm fine with the change.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For regular types I agree, however, here, we actually return a "function". I don't feel strong about it -- it's just personal taste I guess. If I read RecordConverter.rawValueToTimestampedValue() I would expect that rawValueToTimestampedValue() is a method that takes rawValue as input and returns a timestampedValue (as the method name indicates) -- but this is of course not the case (there is not even an input parameter) -- thus, it's confusing to me to read the code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added a cached identity function. IMHO, it's just a little more intentional than saying entity -> entity inline.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: identityRecordConverter or noOpRecordConverter ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this in here because it requires specific knowledge of the concrete store types. Especially checking and recursively unwrapping wrapped stores is a potentially brittle and dangerout operation that shouldn't be done scattered throughout the codebase.

Specifically, as you recurse down the wrapper references, you lose all knowledge of the wrapped type. We avoid a dangerous cast by answering a specific question (is there a timestamped byte store in my wrapped stack of stores?) and returning a boolean, rather than a reference to an unwrapped and casted store.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no advantage to separating the interface and abstract class after adding the generic parameter, so I've collapsed them into one abstract class.

Previously, the interface was nice because some implementations could declare that they were wrapped without being forced to store duplicate references to the inner store.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collapsed them into one abstract class

Class is not declared as abstract -- is this intentionally or did it slip? IMHO, it should be abstract (should the name indicate this, too? -> AbstractedWrappedStateStore?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make it abstract; I don't think there's any advantage to just wrapping a state store, except for testing.

I don't think we need to change the name of the class, though...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only used for testing, and it's pretty awkward, since you don't know what type the inner store of the inner store is, except StateStore.

Rather than keeping it in the interface, I've just inlined it into the two tests that needed it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this -- why not add two generics to the store, one for "wrapped" and one for "root" and keep this method that return the root type?

I would also rename inner() -> root()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we know that there's any specific number of inner types?

This particular method assumes that you're only trying to get the "grandparent" store, but just because that's what a specific test needs to do. Why should a specific test's need be encoded in a method on all stores in runtime code?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we know that there's any specific number of inner types?

No need to know all types for multiple nesting -- only the first and most inner.

I would recommend to change the method to return the most inner store (if, completely unnest instead of only one level.

Why should a specific test's need be encoded in a method on all stores in runtime code?

Runtime code will need this too IIRC (we had POC PRs for timestamped stores that did use unnesting in the DSL in Processor#init() calls). If it's only for tests, I agree that removing makes sense.

@mjsax mjsax added the streams label Feb 11, 2019
@guozhangwang
Copy link
Copy Markdown
Contributor

retest this please

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also cc @ableegoldman

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a meta thought orthogonal to this PR, but I'd leave it here and then create a JIRA ticket later: The StateStore interface is really designed to be "implemented by users, and called by library" -- maybe one can argue that users may want to explicitly call flush as well, but I'd say it is okay to NOT allow users do so. And the extended interface, like KeyValueStore are designed to be "implemented by library or user-customizable, and called by user".

Given this thought, we should probably consider refactoring the hierarchy of state stores more as:

  1. interface StateStore
  2. interface ReadOnlyKeyValueStore
  3. interface KeyValueStateStore extends StateStore, ReadOnlyKeyValueStore (note the name difference)
  4. interface KeyValueStore extends ReadOnlyKeyValueStore.

And 3) would be the one used in Stores factory, i.e. required for either built-in or user-customization, while 4) would be the one returned from ProcessorContext.getStateStore. The returned object would, in fact, always be extending StateStore as well but only library will care to cast and call its inner functions.

I'm not sure if there's a better way to do this in order to get rid of all such decorators, @vvcephei wdyt?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I completely agree (all the "decorators" are only necessary because we got the API wrong, leaking "internal" API like flush(), close() etc into PAPI).

However, I this possible to refactor without breaking the public API?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree 100%, and would very much like to see something like this. It would also help us to avoid leaking other administrative methods if we add them (besides just init, flush, close).

I think cleanest the way to do it backwards compatibly is to introduce a new store hierarchy and deprecate the old one.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think cleanest the way to do it backwards compatibly is to introduce a new store hierarchy and deprecate the old one.

I am afraid that might be the only way to do it -- deprecation hell -- users won't be happy -- thus we should think hard if it's worth to do or not...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I will create a JIRA ticket for now to keep track of it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not introduced in this PR but: why do we still need to override these two functions? Should the ones of WrappedStore be sufficient?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was an oversight. I'll remove the pass-though overrides.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since super.init is innerState.init(context, root); could we just call that to be consistent with other calls, or are you intentionally call this in case we change the implementation of wrappedstore.init in the future?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it would be incorrect design to call wrappedStore().init directly. The job of WrappedStateStore.init is to "do the right thing". If we go and look at what it does today, and just do that thing, it'll work only until the superclass's implementation changes.

Another way to look at it is that we are "intercepting" the init call, to add some extra stuff before and after the init. This doesn't mean we get to interrupt the call hierarchy and circumvent our parent class, nor do we have any reason to do so.

Note that "the other calls" you refer to are not StateStore calls, they are WindowStore calls. And our superclass doesn't implement WindowStore, so it's not possible to call (eg) super.put(key, value, timestamp).

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass over the PR, lgtm!

I only have some questions for my own understanding; after that I can merge this as-is.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice improvement!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same as GlobalStateManagerImpl.java -- maybe add method AbstractStateManager#recordConverter(store) to unify code?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rawValueToTimestampedValueRecordConvert() -- without RecordConverter suffix, it's unclear what it actually returns, a RecordConverter

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: identityRecordConverter or noOpRecordConverter ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collapsed them into one abstract class

Class is not declared as abstract -- is this intentionally or did it slip? IMHO, it should be abstract (should the name indicate this, too? -> AbstractedWrappedStateStore?)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we should first check for this condition, because we should only check the most inner store -- if an wrapping store would (be mistake) implement TimestampedBytesStore, we would return true even if the most inner store does not -- this would be incorrect.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting thought.

Let's say there's a wrapper that advertises that it's a TimestampedBytesStore. This means it is telling Streams that it expects to receive serialized data in the new (with-timestamp) format.

Why does it do this? Maybe it's a translation layer for other stores? In which case, is it correct for Streams to second-guess the implementation and break its own contract by ignoring the marker interface and delivering non-timestamped binary data?

Or maybe, as you suggest, it's just a bug. Is it best for Streams to try and work around a future bug in a state store implementation by reasoning about relative roles of wrapper stores and bytes stores?

I'm not sure... It sounds like it would be more future-proof to just take the store's interface at face value.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it do this? Maybe it's a translation layer for other stores? In which case, is it correct for Streams to second-guess the implementation and break its own contract by ignoring the marker interface and delivering non-timestamped binary data?

I don't think that would work. Note, on restore, we always get the most inner store and would not call this "translation layer wrapper store" (and thus it would break as we would insert our converter and hand timestamped-bytes to the store that does not understand them). If one want to implement a translation wrapper like this, she need to "hide" it from Kafka Streams and not implement WrappingStore (ie, the translation wrapper must be the most inner store).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh... How do we get the most inner store on restore?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WrappedStore should have a method root() (or similar name) that returns the most inner store.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, this method would actually never be needed in the existing code base.

I agree with the naming of it (and the location in the code), but can we defer adding it until it's needed?

To address the earlier question, I believe that the state restoration logic just calls whatever restore listener has been registered. It turns out that the root store is the one to register the listener, but this doesn't require actually unwrapping anything (since it is done within context).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ugly -- we should have access to the wrapped store directly via inheritance. (also below)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ugly -- we should have access to the wrapped store directly via inheritance. (also below)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name() -> wrapped.name()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ugly -- we should have access to the wrapped store directly via inheritance. (also below)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I completely agree (all the "decorators" are only necessary because we got the API wrong, leaking "internal" API like flush(), close() etc into PAPI).

However, I this possible to refactor without breaking the public API?

@vvcephei
Copy link
Copy Markdown
Contributor Author

I've addressed all the comments and rebased to resolve a merge conflict.

@guozhangwang
Copy link
Copy Markdown
Contributor

Regarding whether wrapped should be protected so that all its inheritance classes can access directly, or make it private and access via wrapped() function call, I do not have a strong preference. So I'll leave it to @mjsax : see if you can convince @vvcephei or you are okay to merge as is :)

@vvcephei
Copy link
Copy Markdown
Contributor Author

rebased again to resolve merge conflicts

@vvcephei
Copy link
Copy Markdown
Contributor Author

also responded to the open questions

@vvcephei
Copy link
Copy Markdown
Contributor Author

Java 8 failure was unrelated:
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/19436/

Error Message
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
Stacktrace
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306)
	at kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89)

Java 11 passed:
https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2337/

Retest this, please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 14, 2019

@guozhangwang I am ok with the PR. Feel free to merge.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Java 8 build failed (unrelated):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/19446/

Error Message
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
Stacktrace
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:306)
	at kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89)

Not sure if there's any point continuing to repeat the tests, hoping that the flaky core tests will pass...

@guozhangwang guozhangwang merged commit 01f0f0a into apache:trunk Feb 14, 2019
@vvcephei
Copy link
Copy Markdown
Contributor Author

Thanks for the reviews, @mjsax and @guozhangwang !

@vvcephei vvcephei deleted the wrapper-unification branch February 14, 2019 16:44
guozhangwang pushed a commit that referenced this pull request Feb 14, 2019
Refactor internal store wrapping for improved maintainability.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
@guozhangwang
Copy link
Copy Markdown
Contributor

Cherry-picked to 2.2.

jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* ak/trunk: (45 commits)
  KAFKA-7487: DumpLogSegments misreports offset mismatches (apache#5756)
  MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (apache#6269)
  KAFKA-7935: UNSUPPORTED_COMPRESSION_TYPE if ReplicaManager.getLogConfig returns None (apache#6274)
  KAFKA-7895: Fix stream-time reckoning for suppress (apache#6278)
  KAFKA-6569: Move OffsetIndex/TimeIndex logger to companion object  (apache#4586)
  MINOR: add log indicating the suppression time (apache#6260)
  MINOR: Make info logs for KafkaConsumer a bit more verbose (apache#6279)
  KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics (apache#6265)
  KAFKA-7884; Docs for message.format.version should display valid values (apache#6209)
  MINOR: Save failed test output to build output directory
  MINOR: add test for StreamsSmokeTestDriver (apache#6231)
  MINOR: Fix bugs identified by compiler warnings (apache#6258)
  KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] (apache#5433)
  MINOR: fix bypasses in ChangeLogging stores (apache#6266)
  MINOR: Make MockClient#poll() more thread-safe (apache#5942)
  MINOR: drop dbAccessor reference on close (apache#6254)
  KAFKA-7811: Avoid unnecessary lock acquire when KafkaConsumer commits offsets (apache#6119)
  KAFKA-7916: Unify store wrapping code for clarity (apache#6255)
  MINOR: Add missing Alter Operation to Topic supported operations list in AclCommand
  KAFKA-7921: log at error level for missing source topic (apache#6262)
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Refactor internal store wrapping for improved maintainability.

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants