KAFKA-6036: Local Materialization for Source KTable#5779
KAFKA-6036: Local Materialization for Source KTable#5779guozhangwang merged 25 commits intoapache:trunkfrom
Conversation
…cal-materialization
…cal-materialization t
| .withProcessorParameters(processorParameters) | ||
| .withTopic(topic) | ||
| .build(); | ||
| public <K, V> KTable<K, V> table(final String topic, |
There was a problem hiding this comment.
Removed S template since we will always need a key-value store for this API (the MaterializedInternal indicates that already).
| final KTableSource<K, V> tableSource = new KTableSource<>(storeName, storeName); | ||
|
|
||
| final ProcessorParameters processorParameters = new ProcessorParameters(tableSource, processorName); | ||
| final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, processorName); |
There was a problem hiding this comment.
Minor fixes to remove unchecked warnings, ditto below.
| final String sourceName = newProcessorName(KTableImpl.SOURCE_NAME); | ||
| final String processorName = newProcessorName(KTableImpl.SOURCE_NAME); | ||
| final KTableSource<K, V> tableSource = new KTableSource<>(storeBuilder.name()); | ||
| // enforce store name as queryable name to always materialize global table stores |
There was a problem hiding this comment.
As commented here, for GlobalKTables we should always materialize so we set the queryable name to store name at the first place to enforce that.
| if (processorSupplier instanceof KTableSource) { | ||
| final KTableSource<K, V> source = (KTableSource<K, V>) processorSupplier; | ||
| return new KTableSourceValueGetterSupplier<>(source.storeName); | ||
| // whenever a source ktable is required for getter, it should be materialized |
There was a problem hiding this comment.
This is the key change.
| // need to set the queryable name as the store name to enforce materialization | ||
| public void enableSendingOldValues() { | ||
| sendOldValues = true; | ||
| this.sendOldValues = true; |
There was a problem hiding this comment.
This is the key change.
| } | ||
|
|
||
| public final void addGlobalStore(final StoreBuilder<KeyValueStore> storeBuilder, | ||
| public final void addGlobalStore(final StoreBuilder storeBuilder, |
There was a problem hiding this comment.
To cope with the typing.
| assertFalse(nodes.hasNext()); | ||
| assertFalse(stores.hasNext()); | ||
| assertFalse(subtopologies.hasNext()); | ||
| builder.table(topic, Materialized.with(Serdes.Long(), Serdes.String())); |
There was a problem hiding this comment.
We do not need to use topology description here: only this test case is using it and other functions are relying on itb.
| .count(); | ||
| final TopologyDescription describe = builder.build().describe(); | ||
|
|
||
| System.out.println(describe); |
There was a problem hiding this comment.
This is for debugging: will remove.
| } | ||
|
|
||
| @Test | ||
| public void testValueGetter() { |
There was a problem hiding this comment.
This can be merged with queryable value getter below, ditto in other test classes.
| final String topic1 = "topic1"; | ||
|
|
||
| final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed); | ||
| final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed, Materialized.as("store")); |
There was a problem hiding this comment.
This is to enforce materialization for the test. Ditto elsewhere.
bbejeck
left a comment
There was a problem hiding this comment.
@guozhangwang made a quick pass looks good, I'll take a more detailed look tomorrow and I'll have some comments
bbejeck
left a comment
There was a problem hiding this comment.
Thanks @guozhangwang I've made a pass and overall looks good to me, I just have a couple of minor questions.
I also have one meta-question regarding the logic of store names and the ability to query a store. If a name is not provided up-front then queryable is set to false, but a store name is generated later on via the materializedInternal.generateStoreNameIfNeeded cf https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java#L229. I find it a little confusing that the store name exists but the boolean flag for queryable is false. Would we want to consider a follow-up PR at some point to explicity state the store isn't queryable becuase a name isn't provided up front? I realize that I may be splitting hairs here and overthinking things.
| shouldMaterialize ? materializedInternal.storeName() : null | ||
| ); | ||
| // only materialize if the state store has queryable name | ||
| final String queryableName = materializedInternal != null ? materializedInternal.queryableStoreName() : null; |
There was a problem hiding this comment.
Couldn't the MaterializedInternal#queryableStoreName() return null indicating not to materialize as well?
EDIT: I think the null check in KTableSource makes sure we never hit this condition, correct? If so, you can ignore this comment and the other 2 related ones below.
There was a problem hiding this comment.
I guess it could return null -- but I also think that would be ok, just indicating that we don't materialize the result.
| shouldMaterialize ? materializedInternal.storeName() : null | ||
| ); | ||
| // only materialize if the state store has queryable name | ||
| final String queryableName = materializedInternal != null ? materializedInternal.queryableStoreName() : null; |
|
|
||
| final boolean shouldMaterialize = materialized != null && materialized.isQueryable(); | ||
| // only materialize if users provide a specific queryable name | ||
| final String queryableStoreName = materialized != null ? materialized.queryableStoreName() : null; |
| }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyStoreNameFilter")); | ||
| }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("store2")); | ||
| final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table1.filterNot( | ||
| new Predicate<String, Integer>() { |
| final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter( | ||
| new Predicate<String, Integer>() { | ||
| final KTableImpl<String, String, Integer> table3 = (KTableImpl<String, String, Integer>) table1.mapValues( | ||
| new ValueMapper<String, Integer>() { |
…cal-materialization
Yeah that's a good question that I'm scratching my head about when worked on this PR. For readability here's my reasoning: in order to merge the confusing two fields of There may be a few places that we can further remove the |
|
retest this please |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks, @guozhangwang LGTM. But I'm thinking do we need a unit test that compares the topology where the KTable is materialized and later it's not (or vice versa) to ensure compatibility? If we already have such a test then you can ignore this comment.
Yes, I looked at the TopologyTest and I think it is covered already (note all the modified ones): https://github.com/apache/kafka/pull/5779/files#diff-dc709dbcb4ca71e6bd10350ceb214a00L964 One thing worth noting though, is that we do this without optimization turned on at all since I felt we should just always do this as it does not break compatibility upgrading from any older version without it to the new version. But lmk if you feel differently |
Yes I agree with this approach as well |
mjsax
left a comment
There was a problem hiding this comment.
Some initial comments and questions.
| sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName), | ||
| storeBuilder.name(), | ||
| isQueryable, | ||
| queryableStoreName, |
There was a problem hiding this comment.
I think we should make sure (to avoid bugs), that queryableStoreName == null || queryableStoreName.equals(storeBuilder.name()) -- can we add a check? Not sure if this class is the best place for it? But maybe it's also good to have multiple checks in place?
There was a problem hiding this comment.
I thought about that, and I think it is fine since the call trace is only in two folds:
- MaterializedInternal.queryableName() logic, which always based on storeName().
- StoreBuilder#name()
But I can definitely add an assertion inside the call as it is not on the critical path either. We've never done that before but I cannot think of a good reason not to.
There was a problem hiding this comment.
It's a matter of code style (I personally like assertions, but many people don't). Do we run test with assertions enabled? If not, it does not make sense to add one.
| final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) { | ||
| final String sourceName = newProcessorName(KStreamImpl.SOURCE_NAME); | ||
| final String tableSourceName = newProcessorName(KTableImpl.SOURCE_NAME); | ||
| // only materialize the source store if it is queryable, or it is required to be materialized from downstream |
There was a problem hiding this comment.
nit: this comment is confusion, because it's not clear how to map it to the code? I would remove it. What is the purpose?
There was a problem hiding this comment.
Sounds good. I'll remove it.
| final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName()); | ||
| final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName); | ||
|
|
||
| final TableSourceNode<K, V, KeyValueStore<K, V>> tableSourceNode = TableSourceNode.<K, V, KeyValueStore<K, V>>tableSourceNodeBuilder() |
There was a problem hiding this comment.
Seems like we could remove the store generic type from TableSourceNode, too?
There was a problem hiding this comment.
Good call, will try.
| // need to set the queryable name as the store name to enforce materialization | ||
| public void enableSendingOldValues() { | ||
| sendOldValues = true; | ||
| this.sendOldValues = true; |
| // when the source ktable requires materialization from downstream, we just | ||
| // need to set the queryable name as the store name to enforce materialization | ||
| public void materialize() { | ||
| this.queryableName = storeName; |
| private boolean sendOldValues; | ||
|
|
||
| private boolean sendOldValues = false; | ||
| public KTableSource(final String storeName, final String queryableName) { |
There was a problem hiding this comment.
It seems there are two cases here:
(1) the table is queryable, for this case storeName == queryableName
(2) the table is not queryable, for this case queryableName == null
Thus, it seems cleaner at this level, to keep storeName and isQueryable flag to avoid potential bugs that storeName != queryableName ?
At outer layers, we can still do it differently. Thoughts?
There was a problem hiding this comment.
I personally like the distinguishment of queryableName and storeName for both this class and MaterializedInternal :) Let me try to see if I can persuade you:
-
storeName is the name for underlying store no matter if the store does exist or not (i.e. it may not be materialized), as long as the store is materialized the storeName would never be null.
-
queryableName is the name to query the store, it is set once and done. Logically it represents the logic of
isQueryable ? return storeName : return nullin a single place (otherwise there are a bunch of those places where this logic are needed and hence more error prone).
For KTableSource as an example, by the time we create this node we cannot tell yet whether we will materialize it at the end (e.g. if the downstream operators require this ancestor to send old values or materialize, we will then do it). So we need to generate the storeName as always but keep the queryableName as null.
I think the concern that queryableName != storeName is valid, for which case I would like to put generateStoreNameIfNeeded as part of the MaterializedInternal constructor, and hope that would cleanup a little bit.
| } | ||
|
|
||
| public boolean isQueryable() { | ||
| return queriable; |
There was a problem hiding this comment.
Remove variable queriable and return storeName() != null instead?
| return queriable; | ||
| } | ||
|
|
||
| public String queryableStoreName() { |
There was a problem hiding this comment.
Why do we need this? Can't we call storeName() directly? If the code base is clean, storeName() should return null iff table is not queryable?
There was a problem hiding this comment.
The issue is that we may need to create internal name via generateStoreNameIfNeeded which will set the storeName field. Ditto for the question around isQueryable.
There was a problem hiding this comment.
Thinking about this once more, it seems inconsistent to use boolean flag isQueryable. Should we rather replace it with a second String queyableStoreName that is null if the store is not queryable?
There was a problem hiding this comment.
Ack. I think we can just remove isQueryable actually since for whichever its caller, we can always use queryableName instead.
|
@mjsax updated per comment. |
mjsax
left a comment
There was a problem hiding this comment.
Overall LGTM.
Some more nits. Feel free to merge afterwards.
| } else { | ||
| return this.queryableStoreName; | ||
| } | ||
| return this.queryableStoreName; |
| final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal, | ||
| final boolean filterNot) { | ||
| final String name = builder.newProcessorName(FILTER_NAME); | ||
| // we actually do not need generate store names at all since if it is not specified, we will not |
| tupleForwarder.maybeForward(key, value, oldValue); | ||
|
|
||
| if (queryableName != null) { | ||
| final V oldValue = store.get(key); |
There was a problem hiding this comment.
Why do we unconditionally send old value? This was like this in the old code, but I am wondering why?
There was a problem hiding this comment.
Hmm.. that's a good question! Right now we have two reasons that KTableSource needs to be materialized:
- Its
materialize()is called from the value getter (think: a join in downstream). - Its
sendOldValuesis called.
for 1), we actually dot actually need to send old values actually (and it is indeed the case since in the TupleForwarder if sendOldValues is false, we will actually ignore the old value from the caller; but we can of course save one store read as well! :) ). I will update this logic.
| doTestValueGetter(builder, topic1, table1, table2, table3, table4); | ||
| final KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName2).withValueSerde(Serdes.Integer())); | ||
| final KTableImpl<String, String, Integer> table3 = (KTableImpl<String, String, Integer>) table1.mapValues(value -> new Integer(value) * (-1), Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName3).withValueSerde(Serdes.Integer())); | ||
| final KTableImpl<String, String, Integer> table4 = (KTableImpl<String, String, Integer>) table1.mapValues(Integer::new); |
There was a problem hiding this comment.
nit: avoid long lines. hard to read.
| shouldMaterialize ? materializedInternal.storeName() : null | ||
| ); | ||
| // only materialize if the state store has queryable name | ||
| final String queryableName = materializedInternal != null ? materializedInternal.queryableStoreName() : null; |
There was a problem hiding this comment.
I guess it could return null -- but I also think that would be ok, just indicating that we don't materialize the result.
| return queriable; | ||
| } | ||
|
|
||
| public String queryableStoreName() { |
There was a problem hiding this comment.
Thinking about this once more, it seems inconsistent to use boolean flag isQueryable. Should we rather replace it with a second String queyableStoreName that is null if the store is not queryable?
| public class TableSourceNode<K, V, S extends StateStore> extends StreamSourceNode<K, V> { | ||
|
|
||
| private final StoreBuilder<S> storeBuilder; | ||
| private final MaterializedInternal<K, V, ?> materializedInternal; |
There was a problem hiding this comment.
If we assume it's key-value store only (for now), should we remove the generic type for now?
…cal-materialization
|
| If we assume it's key-value store only (for now), should we remove the generic type for now? Since other contributors are also working on https://cwiki.apache.org/confluence/display/KAFKA/KIP-300%3A+Add+Windowed+KTable+API+in+StreamsBuilder I'd suggest we keep it as is? |
|
Fine with me. Was just a thought. |
…shListener (#6017) This is a follow-up PR from the previous PR #5779, where KTabeSource always get old values from the store even if sendOldValues. It gets me to make a pass over all the KTable/KStreamXXX processor to push the sendOldValues at the callers in order to avoid unnecessary store reads. More details: ForwardingCacheFlushListener and TupleForwarder both need sendOldValues as parameters. a. For ForwardingCacheFlushListener it is not needed at all, since its callers XXXCachedStore already use the sendOldValues values passed from TupleForwarder to avoid getting old values from underlying stores. b. For TupleForwarder, it actually only need to pass the boolean flag to the cached store; and then it does not need to keep it as its own variable since the cached store already respects the boolean to pass null or the actual value.. The only other minor bug I found from the pass in on KTableJoinMerge, where we always pass old values and ignores sendOldValues. Reviewers: Matthias J. Sax <mjsax@apache.org>
Refactor the materialization for source KTables in the way that:
If Materialized.as(queryableName) is specified, materialize;
If the downstream operator requires to fetch from this KTable via ValueGetters, materialize;
If the downstream operator requires to send old values, materialize.
Otherwise do not materialize the KTable. E.g. builder.table("topic").filter().toStream().to("topic") would not create any state stores.
There's a couple of minor changes along with PR as well:
KTableImpl's queryableStoreName and isQueryable are merged into queryableStoreName only, and if it is null it means not queryable. As long as it is not null, it should be queryable (i.e. internally generated names will not be used any more).
To achieve this, splitted MaterializedInternal.storeName() and MaterializedInternal.queryableName(). The former can be internally generated and will not be exposed to users. QueryableName can be modified to set to the internal store name if we decide to materialize it during the DSL parsing / physical topology generation phase. And only if queryableName is specified the corresponding KTable is determined to be materialized.
Found some overlapping unit tests among KTableImplTest, and KTableXXTest, removed them.
There are a few typing bugs found along the way, fixed them as well.
-----------------------
This PR is an illustration of experimenting a poc towards logical materializations.
Today we've logically materialized the KTable for filter / mapValues / transformValues if queryableName is not specified via Materialized, but whenever users specify queryableName we will still always materialize. My original goal is to also consider logically materialize for queryable stores, but when implementing it via a wrapped store to apply the transformations on the fly I realized it is tougher than I thought, because we not only need to support fetch or get, but also needs to support range queries, approximateNumEntries, and isOpen etc as well, which are not efficient to support. So in the end I'd suggest we still stick with the rule of always materializing if queryableName is specified, and only consider logical materialization otherwise.
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <mjsax@apache.org>
…shListener (apache#6017) This is a follow-up PR from the previous PR apache#5779, where KTabeSource always get old values from the store even if sendOldValues. It gets me to make a pass over all the KTable/KStreamXXX processor to push the sendOldValues at the callers in order to avoid unnecessary store reads. More details: ForwardingCacheFlushListener and TupleForwarder both need sendOldValues as parameters. a. For ForwardingCacheFlushListener it is not needed at all, since its callers XXXCachedStore already use the sendOldValues values passed from TupleForwarder to avoid getting old values from underlying stores. b. For TupleForwarder, it actually only need to pass the boolean flag to the cached store; and then it does not need to keep it as its own variable since the cached store already respects the boolean to pass null or the actual value.. The only other minor bug I found from the pass in on KTableJoinMerge, where we always pass old values and ignores sendOldValues. Reviewers: Matthias J. Sax <mjsax@apache.org>
Refactor the materialization for source KTables in the way that:
Otherwise do not materialize the KTable. E.g.
builder.table("topic").filter().toStream().to("topic")would not create any state stores.There's a couple of minor changes along with PR as well:
queryableStoreNameandisQueryableare merged intoqueryableStoreNameonly, and if it is null it means not queryable. As long as it is not null, it should be queryable (i.e. internally generated names will not be used any more).To achieve this, splitted
MaterializedInternal.storeName()andMaterializedInternal.queryableName(). The former can be internally generated and will not be exposed to users. QueryableName can be modified to set to the internal store name if we decide to materialize it during the DSL parsing / physical topology generation phase. And only if queryableName is specified the corresponding KTable is determined to be materialized.Found some overlapping unit tests among
KTableImplTest, andKTableXXTest, removed them.There are a few typing bugs found along the way, fixed them as well.
This PR is an illustration of experimenting a poc towards logical materializations.
Today we've logically materialized the KTable for filter / mapValues / transformValues if queryableName is not specified via Materialized, but whenever users specify queryableName we will still always materialize. My original goal is to also consider logically materialize for queryable stores, but when implementing it via a wrapped store to apply the transformations on the fly I realized it is tougher than I thought, because we not only need to support
fetchorget, but also needs to support range queries,approximateNumEntries, andisOpenetc as well, which are not efficient to support. So in the end I'd suggest we still stick with the rule of always materializing if queryableName is specified, and only consider logical materialization otherwise.Committer Checklist (excluded from commit message)