KAFKA-18026: KIP-1112, migrate foreign-key joins to use ProcesserSupplier#stores #18194
KAFKA-18026: KIP-1112, migrate foreign-key joins to use ProcesserSupplier#stores #18194ableegoldman merged 4 commits intoapache:trunkfrom
Conversation
guozhangwang
left a comment
There was a problem hiding this comment.
One nit question about unit test, please feel free to merge afterwards.
|
|
||
| final String foreignTableJoinName = renamed | ||
| .suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR); | ||
| final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>( |
There was a problem hiding this comment.
Just curious is the final goal to eventually remove StatefulProcessorNode and only use ProcessorGraphNode?
There was a problem hiding this comment.
good question -- basically yes and no. Almost everything has/will be converted from StatefulProcesorNode to ProcessorGraphNode, but there are a few exceptions where we still need the StatefulProcessorNode to connect a processor and state store. For example any processors that access upstream state via a value getter, or a custom ProcessorSupplier passed into the .process/.processValues operator that doesn't implement #stores and manually adds stores by calling StreamsBuilder#addStateStore instead.
I actually have the PR for this ready but it was done on top of this so I was waiting for this one to be merged before calling for review. But it's ready now if you want to take a look (though note that it still can't be merged until Almog's TableSuppressNode PR is merged). See #18195
| (v1, v2) -> v1 + v2, | ||
| TableJoined.as("l-join"), | ||
| Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("materialized-store").withValueSerde(Serdes.String())) | ||
| .toStream(Named.as("toStream")) // 6 |
There was a problem hiding this comment.
What the comment // 6 is about? I think this test case still only have 5 store?
There was a problem hiding this comment.
copy-paste error 🙃 thanks
And yeah, it should have 5 stores
There was a problem hiding this comment.
ill address this in my next PR so i doin't have to run the tests again before merging
|
test failure is unrelated, merging to trunk |
…lier#stores (apache#18194) Convert FKJ processors to implementing the #stores method Reviewers: Guozhang Wang <guozhang.wang.us@gmail.com>
Convert FKJ processors to implementing the #stores method