KAFKA-6813: Remove deprecated APIs in KIP-182, Part I#4919
KAFKA-6813: Remove deprecated APIs in KIP-182, Part I#4919guozhangwang merged 17 commits intoapache:trunkfrom
Conversation
…p120-Kip182-deprecated
| } | ||
| }; | ||
|
|
||
| final Initializer<V> reduceInitializer = new Initializer<V>() { |
There was a problem hiding this comment.
This is moved from TimeWindowedStreamImpl, just to be consistent with the other const functions.
| final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal | ||
| = new MaterializedInternal<>(materialized, builder, REDUCE_NAME); | ||
| = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); | ||
| if (materializedInternal.keySerde() == null) { |
There was a problem hiding this comment.
This is for 2.b), ditto elsewhere.
| true); | ||
| final String name = builder.newProcessorName(FILTER_NAME); | ||
|
|
||
| // only materialize if the state store is queryable |
There was a problem hiding this comment.
This is for 2.a), ditto for mapValues.
| Objects.requireNonNull(initializer, "initializer can't be null"); | ||
| Objects.requireNonNull(aggregator, "aggregator can't be null"); | ||
| Objects.requireNonNull(sessionMerger, "sessionMerger can't be null"); | ||
| return doAggregate(initializer, aggregator, sessionMerger, (Serde<T>) valSerde); |
There was a problem hiding this comment.
This is for 2.c), ditto elsewhere.
| materializedInternal.withKeySerde(keySerde); | ||
| } | ||
| if (materializedInternal.valueSerde() == null) { | ||
| materialized.withValueSerde(Serdes.Long()); |
There was a problem hiding this comment.
This is for 2.d), ditto elsewhere.
| AGGREGATE_NAME, | ||
| windowStoreBuilder(storeName, serde), | ||
| false); | ||
| return aggregate(initializer, aggregator, Materialized.<K, VR, WindowStore<Bytes, byte[]>>with(keySerde, null)); |
There was a problem hiding this comment.
This is a meta explanation about the serde inheritance across multiple internal impl classes:
- reduce: inherit the key and value serdes from the parent XXImpl class.
- count: inherit the key serdes, enforce setting the
Serdes.Long()for value serdes. - aggregate: inherit the key serdes, do not set for value serdes internally (line 92 here is for this case).
…p120-Kip182-deprecated
bbejeck
left a comment
There was a problem hiding this comment.
Just one minor comment, otherwise LGTM.
| .groupBy(MockMapper.selectKeyKeyValueMapper()) | ||
| .count(); | ||
|
|
||
| System.out.print(builder.build().describe()); |
There was a problem hiding this comment.
No :P Will remove it.
|
Failure unrelated, exceded rate limit. Retest this please |
mjsax
left a comment
There was a problem hiding this comment.
Pretty hard to review... Overall looks good. Couple of questions.
Also one meta comment about Serde inheritance: Isn't it inconsistent to do this for groupBy-aggregate only but not for other operators? For example a builder.stream("", Consumed.with(...)).filter.to("", Produced.with()) could forward the Serdes from the source via the filter to the sink making the Produced.with() unnecessary. I like Serde inheritance, but should we do it consistently instead of picking a "random" pair of operators that support it?
| * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); | ||
| * String key = "some-key"; | ||
| * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) | ||
| * }</pre> | ||
| * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to | ||
| * query the value of the key on a parallel running instance of your Kafka Streams application. | ||
| * <p> |
There was a problem hiding this comment.
Hmm.. good point, I will add them back.
| * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) | ||
| * provided by the given {@code storeSupplier}. | ||
| * that can be queried using the provided {@code queryableStoreName}. |
| * <pre>{@code | ||
| * KafkaStreams streams = ... // compute sum | ||
| * String queryableStoreName = storeSupplier.name(); | ||
| * KafkaStreams streams = ... // some aggregation on value type double | ||
| * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); |
There was a problem hiding this comment.
in the example above,
String queryableStoreName = "storeName" // the queryableStoreName should be the name of the store as defined by the Materialized instance
was inserted. should we do the same here?
| * The key of the result record is the same as for both joining input records. | ||
| * Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided | ||
| * {@link ValueJoiner} will be called with a {@code null} value for this/other stream, respectively. | ||
| * {@link ValueJoiner} will be called with a {@code null} value for the this/other stream, respectively. |
| * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. | ||
| * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is | ||
| * user-specified in {@link StreamsConfig} via parameter | ||
| * user-specified in {@link StreamsConfig} via parameter |
| Objects.requireNonNull(materialized, "materialized can't be null"); | ||
| final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal | ||
| = new MaterializedInternal<>(materialized, builder, REDUCE_NAME); | ||
| = new MaterializedInternal<>(materialized, builder, AGGREGATE_NAME); |
There was a problem hiding this comment.
Why the change to AGGREGATE_NAME ?
There was a problem hiding this comment.
This is a good one: while working on this class I realized that we mistakenly used REDUCE_NAME for the prefix, while before this we used AGGREGATE_NAME, and in count and other reduce places we use AGGREGATE_NAME as well.
So technically I think this is the right fix, but arguably it will result in different store names if users are on this version already.
There was a problem hiding this comment.
we mistakenly used REDUCE_NAME for the prefix
Why "mistakenly" -- it's a reduce(...) operator? Because count() is an special case of aggregation() it seems to be fine to use AGGREGATE_NAME there?
There was a problem hiding this comment.
There are three places where reduce were called: KGroupedStream, TimeWindowedKStream, and SessionWindowedKStream. What I've observed is that in the first two we use REDUCE_NAME as both processor name and store name prefixes, and in the latter we use AGGREGTE_NAME for both processor name and store name prefixes. I thought the latter was right and the former is not. But if people can correct me that it is the other way around I'm happy to change in the other way (either case, there is a compatibility concern).
There was a problem hiding this comment.
I agree about the compatibility concern -- if we change the name, it might break the upgrade path. To me, REDUCE_NAME seems to be correct and thus, if we really change align all of them (not sure if it's worth it), we should update SessionWindowedKStream#reduce().
There was a problem hiding this comment.
Sounds good, will leave a note on the upgrade path as well.
|
|
||
| builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); | ||
|
|
||
| return new KTableImpl<>(builder, |
There was a problem hiding this comment.
might be better to move the return after the if-then-else to unify code and just use a boolean variable for the last parameter?
There was a problem hiding this comment.
Ack. Will try to do the same for mapValues as well.
| // only materialize if the state store is queryable | ||
| KTableProcessorSupplier<K, V, V> processorSupplier; | ||
|
|
||
| if (materialized != null && materialized.isQueryable()) { |
There was a problem hiding this comment.
As this is a private method, we could simplify be never calling with null?
There was a problem hiding this comment.
I thought about that, but this would require the other callers to pass in a dummy Materialized which I think does not worth the code cleanness. I've refactored the part according to your comment above and LMK what do you think.
|
|
||
| /** | ||
| * Similar to KStreamAggregationIntegrationTest but with dedupping enabled | ||
| * Similar to KStreamAggregationIntegrationTest but with de dupping enabled |
|
@mjsax I understand this is a large PR... I've tried hard to make it into multiple smaller ones (I'll probably have two other PRs at about similar sizes) while the hope is that for this PR, most of the removals are straight forward except the fixes mentioned in 2). Regarding your question about inheritance: I did this for the following reasons: a) Before KIP-182 we are actually doing sth. already about the inheritance, but only for count and reduce; when we add KIP-182 we mistakenly dropped some of those. |
| * This class allows to access the {@link InternalTopologyBuilder} a {@link Topology} object. | ||
| * | ||
| */ | ||
| public class TopologyWrapper extends Topology { |
There was a problem hiding this comment.
Was this added on purpose in the last commit? Seems to be unused.
There was a problem hiding this comment.
Not on purpose, it was added for a follow-up PR. Will revert for now.
mjsax
left a comment
There was a problem hiding this comment.
Feel free to merge. Please don't forget to update the docs -- can be done in follow up PR, too.
) In #4919 we propagate the SerDes for each of these aggregation operators. As @guozhangwang mentioned in that PR: ``` reduce: inherit the key and value serdes from the parent XXImpl class. count: inherit the key serdes, enforce setting the Serdes.Long() for value serdes. aggregate: inherit the key serdes, do not set for value serdes internally. ``` Although it's all good for reduce and count, it is quiet unsafe to have aggregate without Materialized given. In fact I don't see why we would not give a Materialized for the aggregate since the result type will always be different (otherwise use reduce) and also the value Serde is simply not propagated. This has been discussed previously in a broader PR before but I believe for aggregate we could pass implicitly a Materialized the same way we pass a Joined, just to avoid the stupid case. Then if the user wants to specialize, he can give his own Materialized. Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>
#4919 unintentionally changed the topology naming scheme. This change returns to the prior scheme. Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
I'm breaking KAFKA-6813 into a couple of "smaller" PRs and this is the first one. It focused on: Remove deprecated APIs in KStream, KTable, KGroupedStream, KGroupedTable, SessionWindowedKStream, TimeWindowedKStream. Also found a couple of overlooked bugs while working on them: 2.a) In KTable.filter / mapValues without the additional parameter indicating the materialized stores, originally we will not materialize the store. After KIP-182 we mistakenly diverge the semantics: for KTable.mapValues it is still the case, for KTable.filter we will always materialize. 2.b) In XXStream/Table.reduce/count, we used to try to reuse the serdes since their types are pre-known (for reduce it is the same types for both key / value, for count it is the same types for key, and Long for value). This was somehow lost in the past refactoring. 2.c) We are enforcing to cast a Serde<V> to Serde<VR> for XXStream / Table.aggregate, for which the returned value type is NOT known, such the enforced casting should not be applied and we should require users to provide us the value serde if they believe the default ones are not applicable. 2.d) Whenever we are creating a new MaterializedInternal we are effectively incrementing the suffix index for the store / processor-node names. However in some places this MaterializedInternal is only used for validation, so the resulted processor-node / store suffix is not monotonic. Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>
…ache#5066) In apache#4919 we propagate the SerDes for each of these aggregation operators. As @guozhangwang mentioned in that PR: ``` reduce: inherit the key and value serdes from the parent XXImpl class. count: inherit the key serdes, enforce setting the Serdes.Long() for value serdes. aggregate: inherit the key serdes, do not set for value serdes internally. ``` Although it's all good for reduce and count, it is quiet unsafe to have aggregate without Materialized given. In fact I don't see why we would not give a Materialized for the aggregate since the result type will always be different (otherwise use reduce) and also the value Serde is simply not propagated. This has been discussed previously in a broader PR before but I believe for aggregate we could pass implicitly a Materialized the same way we pass a Joined, just to avoid the stupid case. Then if the user wants to specialize, he can give his own Materialized. Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>
…e#5075) apache#4919 unintentionally changed the topology naming scheme. This change returns to the prior scheme. Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
I'm breaking KAFKA-6813 into a couple of "smaller" PRs and this is the first one. It focused on:
Remove deprecated APIs in KStream, KTable, KGroupedStream, KGroupedTable, SessionWindowedKStream, TimeWindowedKStream.
Also found a couple of overlooked bugs while working on them:
2.a) In KTable.filter / mapValues without the additional parameter indicating the materialized stores, originally we will not materialize the store. After KIP-182 we mistakenly diverge the semantics: for KTable.mapValues it is still the case, for KTable.filter we will always materialize.
2.b) In XXStream/Table.reduce/count, we used to try to reuse the serdes since their types are pre-known (for
reduceit is the same types for both key / value, forcountit is the same types for key, andLongfor value). This was somehow lost in the past refactoring.2.c) We are enforcing to cast a
Serde<V>toSerde<VR>for XXStream / Table.aggregate, for which the returned value type is NOT known, such the enforced casting should not be applied and we should require users to provide us the value serde if they believe the default ones are not applicable.2.d) Whenever we are creating a new
MaterializedInternalwe are effectively incrementing the suffix index for the store / processor-node names. However in some places thisMaterializedInternalis only used for validation, so the resulted processor-node / store suffix is not monotonic.Committer Checklist (excluded from commit message)