KAFKA-3705: Added a foreignKeyJoin implementation for KTable.#5527
KAFKA-3705: Added a foreignKeyJoin implementation for KTable.#5527bbejeck merged 64 commits intoapache:trunkfrom
Conversation
| final ValueMapper<V, KO> keyExtractor, | ||
| final ValueJoiner<V, VO, VR> joiner, | ||
| final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized, | ||
| final Serde<K> thisKeySerde, |
|
|
||
| @Override | ||
| public CombinedKey<KF, KP> deserialize(final String topic, final byte[] data) { | ||
| //{4-byte foreignKeyLength}{foreignKeySerialized}{4-bytePrimaryKeyLength}{primaryKeySerialized} |
There was a problem hiding this comment.
can skip the second length, its known anyway.
There was a problem hiding this comment.
True, I could just do it as the remainder. Thanks
| //{4-byte foreignKeyLength}{foreignKeySerialized}{4-bytePrimaryKeyLength}{primaryKeySerialized} | ||
|
|
||
| final byte[] fkCount = Arrays.copyOfRange(data, 0, 4); | ||
| final int foreignKeyLength = fourBytesToInt(fkCount); |
There was a problem hiding this comment.
treating the whole thing as Buffer?
There was a problem hiding this comment.
I don't understand your question, can you elaborate?
There was a problem hiding this comment.
I think the whole section could look nicer if you would start with ByteBuffer.allocate(totallength).asIntBuffer(keylength).asbyteBuffer.put(key).put(key)...
something
|
|
||
|
|
||
| //TODO - Can reduce some of the parameters, but < 13 is not possible at the moment. | ||
| //Would likely need to split into two graphNodes - ie: foreignKeyJoinNode and foreignKeyJoinOrderResolutionNode. |
There was a problem hiding this comment.
I think the step here an optimizer could potentially exploit is the repartitioning. So one could try to only factor out the repartitioning
There was a problem hiding this comment.
I'll have to look more into the optimizer. TBH I built this originally in 1.0 and just did a functional port, not necessarily a best practices one. Thanks
There was a problem hiding this comment.
I would not recommend to spend to much energy. At the moment I really don't expect the optimizer to be able to exploit any of this. Probably also not in the future. Was just a though popping into my head
|
|
||
| @Override | ||
| public KeyValueIterator<K, V> prefixScan(final K prefix) { | ||
| return this.inner.prefixScan(prefix); |
There was a problem hiding this comment.
probably need to wrap into
DelegatingPeekingKeyValueIterator
| final Materialized foreignMaterialized = Materialized.<CombinedKey<KO, K>, V, KeyValueStore<Bytes, byte[]>>as(prefixScannableDBRef.name()) | ||
| //Need all values to be immediately available in the rocksDB store. | ||
| //No easy way to flush cache prior to prefixScan, so caching is disabled on this store. | ||
| .withCachingDisabled() |
There was a problem hiding this comment.
I notice that in 2.x that I may be able to rework this to allow for enabled cache using a prefixScan function similar to ThreadCache.range. I will have to look into this a bit more, though I don't think it will affect performance much since I anticipate RocksDB prefixScan to take the longest overall.
There was a problem hiding this comment.
Might be, its one of the places I got stuck once. From experience I can tell that its working sufficiently well w/o cache. I think rocks does a pretty good job in not seeking around to randomly on the disk
There was a problem hiding this comment.
I'll leave it out for now. If someone else thinks otherwise, they can speak up or it can be done in a subsequent PR.
| final byte[] offset = longSerializer.serialize(null, context().offset()); | ||
| context().headers().add(KTableRepartitionerProcessorSupplier.this.offset, offset); | ||
| context().headers().add(propagate, falseByteArray); | ||
| context().forward(combinedOldKey, change.newValue); |
There was a problem hiding this comment.
Yes, cleaner to do so. The value is not relevant. I have fixed that and added a clarification comment (I can remove all comments if required before final submission).
|
We should just remove all the |
|
I had to add all the final keywords to pass the linting check - IIRC, my first run had dozens of linting errors preventing compilation. |
a36f5ee to
3d465f5
Compare
8719e16 to
56b76fa
Compare
3e1f62d to
e718610
Compare
8024097 to
3925a3a
Compare
|
@bellemare What is your JIRA ID? Would like to assign the ticket to you. |
|
@mjsax JIRA ID is abellemare |
|
Hi, sorry to intrude on a potentially stale PR, but is this functionality still in development? Would be extraordinarily useful for joining two changelog-like entities. |
|
I sure hope so, my team is looking forward to it as well! Given that the KIP was accepted a few weeks ago, I think it's safe to say it will make it in fairly soon. I would definitely pick up development if @bellemare can't continue. |
|
Hey folks - I'm still trying to get the code put together and finalize some of the changes that were outlined in the KIP. Stay tuned! |
|
Hi All - I'm at a point where I need some feedback on a couple of things:
Feedback is very much appreciated, as this is the first PR I've put up against Kafka and I'm sure I've violated a number of things. |
| @Override | ||
| public byte[] serialize(String topic, SubscriptionResponseWrapper<V> data) { | ||
| //{16-bytes Hash}{n-bytes serialized data} | ||
| byte[] serializedData = serializer.serialize(null, data.getForeignValue()); |
There was a problem hiding this comment.
Why is the topic passed as null? It causes issues with GenericRecord AVRO serializer, since it tries to register schemas under "null-value" subject, and the schema registry responds with "version not compatible" error
There was a problem hiding this comment.
The issue is actually with the Confluent implementation of the SerDe, as they incorrectly attempt to register when null topics are passed in. Read confluentinc/schema-registry#1061 for more details. That being said, it has been extremely quiet in that git repo, I am not sure how much effort Confluent puts into supporting work on that product.
There was a problem hiding this comment.
If this does not gets fixed either way, this PR will be unusable for most of the practical use cases. What is the downside of passing the topic name to the serializer? I tried it, and it seemed to work as expected.
Is there a workaround if confluentinc/schema-registry#1061 is not fixed?
There was a problem hiding this comment.
I think the main issue would be the large amount of internal topic schemas registered to the schema registry. This, combined with any breaking changes to the schema (due to normal business requirement changes) would make it such that you are now needing to manually delete schema registry entries made to internal topics. This is a workflow that I do not believe was ever intended to be done with the Confluent Serde.
As it stands right now, there are allegedly other functionalities that require null serialization ("There are several places in Streams where we need to serialize a value for purposes other than sending it to a topic (KTableSuppressProcessor comes to mind), and using null for the topic is the convention we have."). These too will not work with the confluent Serde.
If they do not fix it, then the next best thing to do would be wrap it in your own implementation and intercept null-topic values to avoid registration. I do not see why it wouldn't be fixed since the current behaviour of registering "null-topic" is fundamentally useless.
Anyways, with all that being said, for this particular line I can certainly pass in the topic since it's fairly well-defined. If you wish to have your internal topics registered to the schema registry, no big deal. For other parts, such as
, there is no solution using the current Confluent Serde.There was a problem hiding this comment.
Confluent serde needs a schema id, and looks like it is not stored in GenericData.Record instance - it may not be trivial to fix confluentinc/schema-registry#1061...
There was a problem hiding this comment.
I discussed this with some other people, and somebody mentioned, that for the value we serialize, this value is actually also store in RocksDB (input KTable). We also know, that the corresponding byte[] are written into the store changelog topic. Hence, instead of using the repartition topic, using the changelog topic should be a better option, as it does not leak anything into SR (or whatever other Serdes might do with the topic name).
Even if there is not changelog topic for the input KTable (we do some optimizations and sometimes don't create one (eg, the store might actual be a logical view and is not materialized). But even for this case, using the changelog topic name seems to be save.
There was a problem hiding this comment.
context().topic() gives the repartition topic name in the serializer, which is what I want. In the processor sections, where I use null, context().topic() gives me the input-topic name for the KTable... which is also fine, since the serializer will check against the input topic schema, which must be valid by definition of the data being within the topic... so I suspect this issue can be laid to rest, in line with adaniline-traderev's suggestion.
This removes any requirement for the upstream serializer to have to do special work for null values.
There was a problem hiding this comment.
@mjsax I'm not sure I can fully follow the suggestion of using changelog topic v.s. the repartition topic here: are you suggesting to do it universally or just for this case? If it is the latter case, I felt it a bit awkward due to inconsistency with other source KTable cases where we will just follow the SourceNode / RecordDeserializer path to deserialize using the source topic; it if it the first case, that also has some drawbacks since with today's topology generation not all source KTables will need to be materialized to a store and hence not necessary having a changelog topic.
I still feel that using the source topic name (and i.e. in this case, the repartition topic) admittedly exposed to SR but is philosophically the right thing to do, and we should consider fixing it on serde APIs in the future. WDYT
There was a problem hiding this comment.
@guozhangwang I was just talking about the foreign-key case (not sure why you thought it might be anything else?). My understanding is the following: The contract is that we should pass a topic name into the serializer of which we want to write the data into. This contract breaks if we pass in the repartition topic name, because we write something different into the repartition topic.
You are right that the changelog topic might not exist, however, my personal take is, that registering for a non-existing topic, is a smaller violation of the contract that passing in the "wrong" repartition topic name. Note, that the changelog topic name is conceptually the "right" topic name. However, this case would not happen very often anyway (compare examples below).
Your comment trigger one more thought: the optimization framework could actually check for different cases, and if there is an upstream topic (either changelog or source topic that has the same schema), we could actually use this name.
Some examples (does not cover all cases):
builder.table("table-topic").foreignKeyJoin(...)
For this case we need to materialize the base table (that is also the join-table), and the schema is registered on table-topic already, so we can pass in table-topic to avoid leaking anything.
builder.table("table-topic").filter(...).foreignKeyJoin(...)
For this case we materialize the derived table from the filter() and we get a proper filter-changelog-topic and we can pass this one.
builder.stream("stream-topic").groupBy().aggregate().foreignKeyJoin(...)
For this case, the agg result KTable is materialized and we can pass the agg-changelog-topic as name.
builder.stream("stream-topic").groupBy().aggregate().filter().foreignKeyJoin(...)
For this case, the agg result KTable is materialized and we can pass the agg-changelog-topic as name, because the filter() does not change the schema. Thus, even if the join-input KTable is not materialized, we can avoid to leak anything by "borrowing" the upstream changelog topic name of the filter input KTable.
builder.table("table-topic").mapValues(...).foreignKeyJoin(...)
For this case, we need to materialize the result of mapValues() and get a proper changelog topic for the join-input table.
builder.table("table-topic", Materialized.as("foo")).mapValues(...).foreignKeyJoin(...)
This might be a weird case, for which the base table is materialized, while the input join-table would not be materialized, and also the type changes via mapValues(). Hence, the table-topic schema is not the same as the join schema and we also don't have a changelog topic for the join-input KTable. We still use the changelog-topic name of the non-existent changelog topic (of the mapValues() result KTable).
As you can see, we can cover a large scope of cases for which we don't leak anything and can always use a topic name that contains data corresponding to the schema. Does this explain my thoughts?
There was a problem hiding this comment.
I understand your reasoning now, but still I felt Streams should not fix it trying to piggy-back on another topic that happens to be of the same schema that this serde is used for; or rather, I'd prefer to use a non-exist dummy topic than an existing topic if we do not like repartition topics (again, I agree that repartition topic is not ideal, since we are, in fact, not sending the bytes serialized in that way to the topic).
|
Hi @bellemare , Thanks for your PR! I'll review this as soon as I get the chance, and pay particular attention to the points you called out. -John |
|
Thanks, @bellemare , yeah, it would be nice to see Jenkins give us at least 1 green build. |
|
The java 8 build had just one failure, which I think is ok: Likewise with the java 11+scala 2.13 build: And the java 11+scala 2.12 build: |
|
(JDK 11 & Scala 2.13) and (JDK 8, Scala 2.11) both failed on the same test, though it appears totally unrelated: |
|
Hmm, It's almost like we're all just sitting around watching these builds... I agree, I don't think that failure is related, and I also don't think it's worthwhile to keep running the tests to see if that broken test passes next time. |
|
Note, the failing test is known to be broken: https://issues.apache.org/jira/browse/KAFKA-8690 |
|
Test failures unrelated and local build passes, so merging this. |
|
Merged #5527 into trunk. |
|
Thanks @bellemare for the hard work and perseverance to get this done! |
|
Yeah, congratulations, @bellemare for this awesome contribution! |
|
Kudos! I’ve been following this feature for a year and am extremely excited to see it make it in.
… On Oct 3, 2019, at 8:44 PM, John Roesler ***@***.***> wrote:
Yeah, congratulations, @bellemare for this awesome contribution!
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub, or mute the thread.
|
|
Congratulations @bellemare , it's been a long KIP and journey :) |
|
Congratulations @bellemare!!! And thanks for all the hard work!!! Also thanks to @Kaiserchen for his initial proposal and the many hours you invested in this KIP! Check out https://twitter.com/kafkastreams/status/1179974460167745536 |
Fixes compile error introduced by merging apache#5527
|
Thanks for all the help everyone, especially from @vvcephei. I couldn't have done it without you all. And thanks to Jan too for getting it started so long ago. |
|
@bellemare Do we need to update the docs for the new feature? We should at least mention in the upgrade guide. Would you like to update the wiki, too: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics |
|
Besides the upgrade guide, I think we can also update the developer-guide on dsl section: https://kafka.apache.org/23/documentation/streams/developer-guide/dsl-api.html |
|
@mjsax @guozhangwang I'll take a look at updating them. What/where is the upgrade guide? |
|
I would add maybe one bullet point to https://github.com/apache/kafka/blob/trunk/docs/upgrade.html (as "notable changes") and also list it in https://github.com/apache/kafka/blob/trunk/docs/streams/upgrade-guide.html Not sure if both files have already a section for |
|
@vvcephei @bellemare Do you know of any cases that would lead to duplicate updates being emitted by these joins? We first noticed this because we saw "Detected out-of-order KTable update" messages for topics emitted from some of these left joins. Looking at the updates, they were identical except for the timestamps. But it happens intermittently -- and not consistently even with the same input data. We switched to inner joins, and still saw them. Sometimes they updates had the same timestamp, but when they had different timestamps, one would have the timestamp from the corresponding left-table update, and the other would have the timestamp from the right-table update. So far I don't think this is causing bad data or any real problems. Just wanted to check if you had seen it before I keep digging. Thanks! |
|
@thebearmayor I think this is fine: as we've discussed in KIP duplicates may happen for the same join-results but they will happen intermittently as you've observed. Since it is for a KTable changelog streams overwriting multiple times would not cause correctness issues. @vvcephei and @bellemare could chime in with more insights. |
|
That also makes me thinking, should we make an info level entry than warn to not cause unnecessary panic since it would not cause correctness issues anyways? |
|
Interesting though -- mid to long term, I think we need to allow better handling of out-of-order data for Maybe we should make the warning more flexible and only turn it on for |
|
Hi, @mjsax, just to be clear I am getting the warning from a |
|
Thanks for the clarification. Maybe we introduce more out-of-order records due to the round-trip via two repartition topics than we anticipated... But I am not 100% sure why -- each update to the left-hand side would send one message to the right hand side and should receive zero or one respond messages. An update to the right hand side could send multiple message to the left hand side, however maximum one per key. -- If we compute the join result timestamp as maximum of both input timestamps, I don't see atm why we would introduce much out-of-order data. \cc @bellemare @vvcephei Thoughts? |
|
@thebearmayor Do you have any other information on how common it is, or steps to reproduce? @mjsax I don't have any ideas off the top of my head, but I will take a look at the code again with this in mind... |
|
@bellemare I'll tell you everything I can think of, most of which won't be relevant. I didn't mean to take up more of your time. I'll be offline until next week, and I mean to dig into it more then. I don't have any way yet to reproduce this in development. I've only seen it running in ec2 with 3 instances against large topics, with 16 partitions. I assume there is some timing component to the issue.
|
https://issues.apache.org/jira/browse/KAFKA-3705
Foreign Key Join:
Allows for a KTable to map its value to a given foreign key and join on another KTable keyed on that foreign key. Applies the joiner, then returns the tuples keyed on the original key. This supports updates from both sides of the join.
Design Philosophy:
The intent of this design was to build a totally encapsulated function that operates very similarly to the regular join function. No further work is required by the user to obtain their foreignKeyJoin results after calling the function. That being said, there is increased cost in some of the topology components, especially due to resolving out-of-order arrival due to foreign key changes. I would appreciate any and all feedback on this approach, as my understanding of the Kafka Streams DSL is to provide higher level functionality without requiring the users to know exactly what's going on under the hood.
Some points of note:
Requires an additional materialized State Store for the prefixScanning of the repartitioned CombinedKey events.
ReadOnlyKeyValueStore interface was modified to contain prefixScan. This requires that all implementations support this, but follows an existing precedent where some store functions are already stubbed out with exceptions.
Currently limited to Inner Join (can do more join logic in future - just limiting the focus of this KIP).
Application Reset does not seem to delete the new internal topics that I have added. (only tested with Kafka 1.0).
Only works with identical number of input partitions at the moment, though it may be possible to get it working with KTables of varying input partition count.
Testing:
Testing is covered by a two integration tests that exercises the foreign key join.
The first test exercises the out-of-order resolution and partitioning strategies by running three streams instances on three partitions. This demonstrates the scalability of the proposed solution.
important The second test (KTableKTableForeignKeyInnerJoinMultiIntegrationTest) attempts to join using foreign key twice. This results in a NullPointerException regarding a missing task, and must be resolved before committing this.