KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (5/N)#21497
KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (5/N)#21497mjsax merged 12 commits intoapache:trunkfrom
Conversation
7d824e8 to
f9e684a
Compare
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks @frankvicky . I made a pass.
| public TimestampedToHeadersWindowStoreAdapter(final WindowStore<Bytes, byte[]> store) { | ||
| if (!store.persistent()) { | ||
| throw new IllegalArgumentException("Provided store must be a persistent store, but it is not."); | ||
| } |
There was a problem hiding this comment.
also check if store is not of type timestamed, and then throw exception since we dont support upgrade from normal window store to ts-window-store-with-headers
|
|
||
| @SuppressWarnings("unchecked") | ||
| @Override | ||
| public <R> QueryResult<R> query(final Query<R> query, |
There was a problem hiding this comment.
we should throw unsupportedoperationexception here as we dont support IQv2 yet.
| } | ||
|
|
||
| @Override | ||
| public <R> QueryResult<R> query(final Query<R> query, |
There was a problem hiding this comment.
We don' t support IQv2. Please just throw exception here.
| @Override | ||
| public KeyValue<Windowed<Bytes>, byte[]> next() { | ||
| final KeyValue<Windowed<Bytes>, byte[]> timestampedKeyValue = innerIterator.next(); | ||
| if (timestampedKeyValue == null) { |
There was a problem hiding this comment.
This null check is missing in TimestampedToHeadersIteratorAdapter<K> -- not sure why.
Might be a bug we should address?
There was a problem hiding this comment.
Should we address in this PR or in another minor PR?
There was a problem hiding this comment.
Should we address in this PR or in another minor PR?
Not sure. Whatever you prefer.
| * Iterator adapter for WindowStoreIterator that converts timestamp-only values | ||
| * to timestamp-with-headers format by adding empty headers. | ||
| */ | ||
| private static class TimestampedToHeadersWindowStoreIteratorAdapter implements WindowStoreIterator<byte[]> { |
There was a problem hiding this comment.
It seems this one is very similar to KeyValueToTimestampedKeyValueIteratorAdapter just with the difference of the "converter function".
If we make the "converter function" a parameter, we would unify the code and just pass in different function.
Let's file a ticket to do this unification to reduce duplicated code. We should file this ticket not limited for this case, but look across all of the adaptor code, to see what we can unify. This might not be the only case of unnecessary code duplication.
There was a problem hiding this comment.
57562a0 to
3ce3c92
Compare
# Conflicts: # streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
ae5454c to
4f690f4
Compare
This PR implements the upgrade integration tests for `TimestampedWindowStateStoreWithHeaders` introduced in KIP-1271. The class should be reviewd: `HeadersStoreUpgradeIntegrationTest` This should not be merged before #21497 Reviewers: Matthias J. Sax <matthias@confluent.io>, Alieh Saeedi <asaeedi@confluent.io>
…e#21497) This PR adds required classes or modifies the existing ones to build the `TimestampedWindowStoreWithHeaders` introduced in KIP-1271. Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
…e#21581) This PR implements the upgrade integration tests for `TimestampedWindowStateStoreWithHeaders` introduced in KIP-1271. The class should be reviewd: `HeadersStoreUpgradeIntegrationTest` This should not be merged before apache#21497 Reviewers: Matthias J. Sax <matthias@confluent.io>, Alieh Saeedi <asaeedi@confluent.io>
…e#21497) This PR adds required classes or modifies the existing ones to build the `TimestampedWindowStoreWithHeaders` introduced in KIP-1271. Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
…e#21497) This PR adds required classes or modifies the existing ones to build the `TimestampedWindowStoreWithHeaders` introduced in KIP-1271. Reviewers: Alieh Saeedi <asaeedi@confluent.io>, Matthias J. Sax <matthias@confluent.io>
…e#21581) This PR implements the upgrade integration tests for `TimestampedWindowStateStoreWithHeaders` introduced in KIP-1271. The class should be reviewd: `HeadersStoreUpgradeIntegrationTest` This should not be merged before apache#21497 Reviewers: Matthias J. Sax <matthias@confluent.io>, Alieh Saeedi <asaeedi@confluent.io>
This PR adds required classes or modifies the existing ones to build the
TimestampedWindowStoreWithHeadersintroduced in KIP-1271.Reviewers: Alieh Saeedi asaeedi@confluent.io, Matthias J. Sax
matthias@confluent.io