Conversation
|
@enothereska @mjsax @guozhangwang. This follows the design as proposed in the KIP. Thanks |
enothereska
left a comment
There was a problem hiding this comment.
Initial pass on just high level APIs and high-level implementation only.
| /** | ||
| * Combine values of this stream by key into {@link SessionWindows} | ||
| * The resulting {@link KTable} will be materialized in a local state | ||
| * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" |
There was a problem hiding this comment.
StoreName is not defined in this API.
|
|
||
| /** | ||
| * Aggregate values of this stream by key into {@link SessionWindows}. | ||
| * The resulting {@link KTable} will be materialized in a local state |
There was a problem hiding this comment.
Same here with storename.
| /** | ||
| * Count number of records of this stream by key into {@link SessionWindows}. | ||
| * The resulting {@link KTable} will be materialized in a local state | ||
| * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog" |
There was a problem hiding this comment.
Same here with storename.
| * +-----------+-------------+------------+ | ||
| * | ||
| * The previous 2 sessions would be merged into a single session with start time 10 and end time 20. | ||
| * The aggregate value for this session would be the result of aggregating all 4 values. |
| import org.apache.kafka.streams.kstream.Windowed; | ||
|
|
||
| import java.nio.ByteBuffer; | ||
|
|
There was a problem hiding this comment.
Consider adding a line or two on what this class is meant for.
There was a problem hiding this comment.
This is a general thing we lack for many internal classes... It's hard to get started if it's not clear what a class does and even more important why.
The comment added is actually quite good!
| @@ -25,21 +25,24 @@ | |||
| class TupleForwarder<K, V> { | |||
There was a problem hiding this comment.
Can we add a line to describe what this class is for?
| 8 + // offset | ||
| 4 + // partition | ||
| topic.length(); | ||
| (topic == null ? 0 : topic.length()); |
| } | ||
| entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, node.entry)); | ||
| node.entry.markClean(); | ||
| if (node.entry.value == null) { |
There was a problem hiding this comment.
So if value==null we automatically delete?
There was a problem hiding this comment.
Yes - automatically delete from the cache.
| * @param aggTwo the second aggregate | ||
| * @return the new aggregate value | ||
| */ | ||
| V apply(K aggKey, V aggOne, V aggTwo); |
There was a problem hiding this comment.
It is consistent with Aggregator - also, why not? Maybe a developer wants to do something with the key.
There was a problem hiding this comment.
It's a matter of taste I guess. For example ValueJoiner has no key argument either (there was a discussion about adding the key though...). The API is nor really consistent right now. Should we try to make it consisted adding key parameter everywhere?
\cc @enothereska @miguno @guozhangwang
|
|
||
|
|
||
| /** | ||
| * A session based window specification uses for aggregating events into sessions. |
| * then a new session will be created. | ||
| * | ||
| * For example, If we have a session gap of 5 and the following data arrives: | ||
| * +--------------------------------------+ |
There was a problem hiding this comment.
nit: this will not render nicely as compiled JavaDoc. Can you add some markup?
| * @return a new SessionWindows with the provided inactivity gap | ||
| * and default maintain duration | ||
| */ | ||
| public static SessionWindows inactivityGap(final long gapMs) { |
There was a problem hiding this comment.
What about with(final long inactivityGap) instead of inactivityGap(final long gapMs)? This would be closer to TimeWindows#of(long size) ?
| final SessionMerger<K, V> sessionMerger = new SessionMerger<K, V>() { | ||
| @Override | ||
| public V apply(final K aggKey, final V aggOne, final V aggTwo) { | ||
| return aggregator.apply(aggKey, aggTwo, aggOne); |
There was a problem hiding this comment.
We have to document that the aggregator is used a SessionMerger.
| * @see org.apache.kafka.streams.state.Stores#create(String) | ||
| */ | ||
| public class RocksDBStore<K, V> implements KeyValueStore<K, V> { | ||
| class RocksDBStore<K, V> implements KeyValueStore<K, V> { |
There was a problem hiding this comment.
Why not allow developers to use it?
There was a problem hiding this comment.
It is in the internals package and i believe in making internal code as private as possible. If they want to use the RocksDBStore they can already create one via the various suppliers etc.
All that said, though, this could break anyone that has sub-classed this. So i'll change it back.
| new Properties()), | ||
| t1); | ||
|
|
||
| final long t2 = t1 + (sessionGap / 2); |
There was a problem hiding this comment.
final long t2 = t1 + sessionGap;
to test the corner case
There was a problem hiding this comment.
nah - that is what i wanted. There are further tests in KStreamSessionWindowAggregateProcessorTest
|
|
||
| } | ||
|
|
||
|
|
There was a problem hiding this comment.
Could you add a test, that does merge two session into a single one, because a late record arrives (ie late record is in between both session) ?
There was a problem hiding this comment.
Is already covered in KStreamSessionWindowAggregateProcessorTest
| StringSerializer.class, | ||
| new Properties()), | ||
| t2); | ||
| final long t3 = t1 + sessionGap; |
There was a problem hiding this comment.
final long t3 = t1 + sessionGap + 1; -- otherwise I would assume that the sessions get merged.
| @Override | ||
| public void schedule(long interval) { | ||
| throw new UnsupportedOperationException("schedule() not supported."); | ||
| // throw new UnsupportedOperationException("schedule() not supported."); |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
| final String storeName); | ||
|
|
||
| /** | ||
| * Combine values of this stream by the grouped key into {@link SessionWindows}. |
There was a problem hiding this comment.
Do we need repeated Javadoc everywhere? Not sure what's best practices here. Just there is a lot of repetition and perhaps one aggregate can have the Javadoc and others can point to it?
There was a problem hiding this comment.
I know - it is consistent with the rest of the class.
There was a problem hiding this comment.
Agree that it is a problem, but if you use an IDE and hoover over a class, it should display it -- and not just a link to another method... I would be awesome if JavaDoc would allow to factor common stuff out for overloaded methods. But I don't know a good way to do it either -- there is only c&p... :(
| * | ||
| * @return itself | ||
| */ | ||
| public SessionWindows until(long durationMs) { |
There was a problem hiding this comment.
Should duration be final?
| * Fetch any session aggregates with the matching key and the sessions end is >= earliestEndTime and the sessions | ||
| * start is <= latestStartTime | ||
| */ | ||
| KeyValueIterator<Windowed<K>, AGG> findSessionsToMerge(K key, long earliestSessionEndTime, long latestSessionStartTime); |
There was a problem hiding this comment.
Should parameters be final for consistency with your other code?
| * where each table contains records with unmodified keys and values | ||
| * that represent the latest (rolling) count (i.e., number of records) for each key within that window | ||
| */ | ||
| KTable<Windowed<K>, Long> count(final SessionWindows sessionWindows, final String storeName); |
There was a problem hiding this comment.
This probably doesn't belong to this PR, but I wonder why we need count at all, it's just an aggregate.
There was a problem hiding this comment.
I guess, it's THE aggregate everyone wants. Syntactic sugar.
| while (iterator.hasNext()) { | ||
| final KeyValue<Windowed<K>, T> next = iterator.next(); | ||
| merged.add(next); | ||
| agg = sessionMerger.apply(key, agg, next.value); |
There was a problem hiding this comment.
This would have probably better been part of the KIP discussion, but there will be some aggregates, like Average for which this kind of merging won't work because they will need more than just the previous aggregate values. How do we catch such cases and warn the user?
There was a problem hiding this comment.
This is the same for non SessionWindows based aggregates. I.e, if you aggregate an int into an Average you cant just apply the new int to the currently aggregated Average.
| if (!mergedWindow.equals(newTimeWindow)) { | ||
| for (final KeyValue<Windowed<K>, T> session : merged) { | ||
| store.remove(session.key); | ||
| tupleForwarder.maybeForward(session.key, null, session.value); |
There was a problem hiding this comment.
Don't we want to first invalidate previous values, then send new values, i.e., flip the order of this forwarding with the previous one? I can't immediately find the Javadoc where we tell the user the order.
| }; | ||
| } | ||
|
|
||
| static class SessionStoreIterator<K, AGG> implements KeyValueIterator<Windowed<K>, AGG> { |
There was a problem hiding this comment.
Why is this part of RocksDbSessionStore? Isn't it useful for stores other than RocksDb-based?
There was a problem hiding this comment.
It doesn't strictly need to be part of RocksDbSessionStore, but it is only used here as of now. If it is needed elsewhere later, then we should move it later.
| class CachingSessionStore<K, AGG> implements SessionStore<K, AGG>, CachedStateStore<Windowed<K>, AGG> { | ||
|
|
||
| private final SegmentedBytesStore bytesStore; | ||
| private final RocksDBSessionStore.SessionKeySchema keySchema; |
There was a problem hiding this comment.
If we use a different store, not RocksDb, will this variable still be good?
There was a problem hiding this comment.
Yes - it applies to a SegmentedBytesStore - which is not RocksDB specific. The class could be moved out of RocksDBSessionStore, but I don't really see the need to at the moment.
| public void apply(final List<ThreadCache.DirtyEntry> entries) { | ||
| for (ThreadCache.DirtyEntry entry : entries) { | ||
| final Bytes binaryKey = entry.key(); | ||
| final RecordContext current = context.recordContext(); |
There was a problem hiding this comment.
In previous stores, like CachingKeyValueStore, you put the code below in a separate method called putAndMaybeForward that I thought was a neat way to break the code into chunks.
| } | ||
|
|
||
| public void close() { | ||
| bytesStore.close(); |
There was a problem hiding this comment.
Do we want to flush and close the cache too?
There was a problem hiding this comment.
Yeah. The closing of the cache was dependent on another PR that has now been merged. flush doesn't matter so much as we always flush and then close, but i'll add it anyway
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
| Objects.requireNonNull(sessionMerger, "sessionMerger can't be null"); | ||
| Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); | ||
|
|
||
| return (KTable<Windowed<K>, T>) doAggregate( |
There was a problem hiding this comment.
Is the casting needed?
There was a problem hiding this comment.
It is consistent with what is already there. Also, i can't think of another way of doing it without duplicating doAggregate, which i'd rather not do
|
Refer to this link for build results (access rights to CI server needed): |
|
@dguy I had a look over:
Apart from the above comments I don't have anything else. Thanks. |
|
Refer to this link for build results (access rights to CI server needed): |
|
@guozhangwang i've addressed your comments. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
I understand... Just afraid such tech debt will keep growing and eventually come back and bite us as unstable and hard-to-debug code. Well how about merging this PR as is and see if we can still push it as a separate PR before the release deadline? |
|
@guozhangwang - that is fine with me. I agree with the tech-debt and i think we should really focus the next release on bug fixing, tech-debt, usability etc. |
|
@dguy Cool, will merge it as is. Note that I have a couple follow-up comments in the previous round, but I think it would be easier to address them as a new PR than this 5000+ LOC one. |
This is a refactoring follow-up of #2166. Main refactoring changes: 1. Extract `InMemoryKeyValueStore` out of `InMemoryKeyValueStoreSupplier` and remove its duplicates in test package. 2. Add two abstract classes `AbstractKeyValueIterator` and `AbstractKeyValueStore` to collapse common functional logics. 3. Added specialized `BytesXXStore` to accommodate cases where key value types are Bytes / byte[] so that we can save calling the dummy serdes. 4. Make the key type in `ThreadCache` from byte[] to Bytes, as SessionStore / WindowStore's result serialized bytes are in the form of Bytes anyways, so that we can save unnecessary `Bytes.get()` and `Bytes.wrap(bytes)`. Each of these should arguably be a separate PR and I apologize for the mess, this is because this branch was extracted from a rather large diff that has multiple refactoring mingled together and dguy and myself have already put lots of efforts to break it down to a few separate PRs, and this is the only left-over work. Such PR won't happen in the future. Ping dguy enothereska mjsax for reviews Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Damian Guy, Matthias J. Sax, Jun Rao Closes #2333 from guozhangwang/K3452-followup-state-store-refactor
Add support for SessionWindows based on design detailed in https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows. This includes refactoring of the RocksDBWindowStore such that functionality common with the RocksDBSessionStore isn't duplicated. Author: Damian Guy <damian.guy@gmail.com> Reviewers: Eno Thereska <eno.thereska@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com> Closes apache#2166 from dguy/kafka-3452-session-merge
This is a follow up of apache#2166 - refactoring the store hierarchies as requested Author: Damian Guy <damian.guy@gmail.com> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes apache#2360 from dguy/state-store-refactor
This is a refactoring follow-up of apache#2166. Main refactoring changes: 1. Extract `InMemoryKeyValueStore` out of `InMemoryKeyValueStoreSupplier` and remove its duplicates in test package. 2. Add two abstract classes `AbstractKeyValueIterator` and `AbstractKeyValueStore` to collapse common functional logics. 3. Added specialized `BytesXXStore` to accommodate cases where key value types are Bytes / byte[] so that we can save calling the dummy serdes. 4. Make the key type in `ThreadCache` from byte[] to Bytes, as SessionStore / WindowStore's result serialized bytes are in the form of Bytes anyways, so that we can save unnecessary `Bytes.get()` and `Bytes.wrap(bytes)`. Each of these should arguably be a separate PR and I apologize for the mess, this is because this branch was extracted from a rather large diff that has multiple refactoring mingled together and dguy and myself have already put lots of efforts to break it down to a few separate PRs, and this is the only left-over work. Such PR won't happen in the future. Ping dguy enothereska mjsax for reviews Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Damian Guy, Matthias J. Sax, Jun Rao Closes apache#2333 from guozhangwang/K3452-followup-state-store-refactor
|
Hello, i'm having an issue trying to use SessionWindows This exception is thrown:
Can you please help me with this? Thanks. |
|
@MarcoAbi Could you please report this at the Kafka mailing list: http://kafka.apache.org/contact Thanks. |
This is a follow up of apache/kafka#2166 - refactoring the store hierarchies as requested Author: Damian Guy <damian.guy@gmail.com> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes #2360 from dguy/state-store-refactor
This is a refactoring follow-up of apache/kafka#2166. Main refactoring changes: 1. Extract `InMemoryKeyValueStore` out of `InMemoryKeyValueStoreSupplier` and remove its duplicates in test package. 2. Add two abstract classes `AbstractKeyValueIterator` and `AbstractKeyValueStore` to collapse common functional logics. 3. Added specialized `BytesXXStore` to accommodate cases where key value types are Bytes / byte[] so that we can save calling the dummy serdes. 4. Make the key type in `ThreadCache` from byte[] to Bytes, as SessionStore / WindowStore's result serialized bytes are in the form of Bytes anyways, so that we can save unnecessary `Bytes.get()` and `Bytes.wrap(bytes)`. Each of these should arguably be a separate PR and I apologize for the mess, this is because this branch was extracted from a rather large diff that has multiple refactoring mingled together and dguy and myself have already put lots of efforts to break it down to a few separate PRs, and this is the only left-over work. Such PR won't happen in the future. Ping dguy enothereska mjsax for reviews Author: Guozhang Wang <wangguoz@gmail.com> Reviewers: Damian Guy, Matthias J. Sax, Jun Rao Closes #2333 from guozhangwang/K3452-followup-state-store-refactor
Add support for SessionWindows based on design detailed in https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows.
This includes refactoring of the RocksDBWindowStore such that functionality common with the RocksDBSessionStore isn't duplicated.