KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (N/N)#21581
KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (N/N)#21581frankvicky merged 10 commits intoapache:trunkfrom
Conversation
e5ecbba to
62aefce
Compare
aliehsaeedii
left a comment
There was a problem hiding this comment.
Thanks @frankvicky.
| public void put(final K key, final ValueTimestampHeaders<V> value, final long windowStartTimestamp) { | ||
| Objects.requireNonNull(key, "key cannot be null"); | ||
| final Headers headers = value.headers() == null ? new RecordHeaders() : value.headers(); | ||
| final Headers headers = value == null || value.headers() == null ? new RecordHeaders() : value.headers(); |
There was a problem hiding this comment.
Should we do that in other metered classes as well? Did you @frankvicky notice any issue when testing here?
There was a problem hiding this comment.
I introduced this change when I was writing migration test. iterator will fail due to value is null.
| * Iterator adapter for WindowStoreIterator that converts timestamp-only values | ||
| * to timestamp-with-headers format by adding empty headers. | ||
| */ | ||
| private static class TimestampedToHeadersWindowStoreIteratorAdapter implements WindowStoreIterator<byte[]> { |
There was a problem hiding this comment.
naming: I 'm not sure but maybe TimestampedWindowToHeadersWindowStoreIteratorAdapter?
this name is longer but more descriptive.
There was a problem hiding this comment.
If you compare this iterator with WindowToTimestampedWindowIteratorAdapter, should it extend TimestampedToHeadersIteratorAdapter?
| @Override | ||
| public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final long timeFrom, final long timeTo) { | ||
| return new TimestampedToHeadersIteratorAdapter<>(store.backwardFetchAll(timeFrom, timeTo)); | ||
| } |
There was a problem hiding this comment.
What about public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(final Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {...}?
There was a problem hiding this comment.
Please verify that all relevant methods have Instant-based overloads, consistent with WindowToTimestampedWindowByteStoreAdapter.java.
| public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final long timeFrom, final long timeTo) { | ||
| return new TimestampedToHeadersIteratorAdapter<>(store.fetchAll(timeFrom, timeTo)); | ||
| } | ||
|
|
There was a problem hiding this comment.
What about public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(final Instant timeFrom, final Instant timeTo) throws IllegalArgumentException {...}?
| } | ||
|
|
||
| /** | ||
| * Tests migration from TimestampedWindowStore (v2) to TimestampedWindowStoreWithHeaders (v3). |
| * This is a true migration where both supplier and builder are upgraded. | ||
| */ | ||
| private void shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final boolean persistentStore) throws Exception { | ||
| // Phase 1: Run with old v2 store |
| final StreamsBuilder newBuilder = new StreamsBuilder(); | ||
| newBuilder.addStateStore( | ||
| Stores.timestampedWindowStoreWithHeadersBuilder( | ||
| Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), // v2 supplier! |
| kafkaStreams.close(); | ||
| kafkaStreams = null; | ||
|
|
||
| // Restart with v3 builder BUT v2 supplier (proxy/adapter mode) |
There was a problem hiding this comment.
I'm not sure if we ever had this terminology (v2, v3`).
|
|
||
| @Test | ||
| public void shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws Exception { | ||
| // Phase 1: Run with TimestampedWindowStore (v2) |
There was a problem hiding this comment.
Didn't we check this upgrade in HeadersUpgradeIntegrationTest?
| store.put(record.key(), ValueTimestampHeaders.make(record.value(), record.timestamp(), record.headers())); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Should we add downgrade tests as well?
f989100 to
c6ebefa
Compare
| } | ||
|
|
||
| @Test | ||
| public void shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore() throws Exception { |
There was a problem hiding this comment.
Did you look into StoreUpgradIntegrationTest#shouldFailDowngradeFromTimestampedToRegularKeyValueStore
Might be simpler to follow this pattern?
There was a problem hiding this comment.
IIUC, we could not follow the same pattern since the exception has been swallowed.
There was a problem hiding this comment.
😲 -- But it this correct to just swallow the exception???
There was a problem hiding this comment.
Did some digging and it's like this since 2016, when this code was added... (#2166 -> add Segments.java -> was later refactored and the code was move into AbstractSegments.java)
There was a problem hiding this comment.
Should we raise a ticket for this?
There was a problem hiding this comment.
I suppose we should throw the ProcessorStateException here. In this way, we could align the behavior between kv store and window store and also simpify the downgrade test case.
c6ebefa to
4b6a1a8
Compare
| } | ||
| // Otherwise (null or corrupted data), the downgrade failed as expected | ||
| } catch (final Exception e) { | ||
| // Exception during read is expected - indicates format mismatch |
There was a problem hiding this comment.
We should verify the expected error message, to avoid that we catch some other exception which is unrelated to the test.
| final ValueAndTimestamp<String> result = store.fetch("key1", windowStart); | ||
|
|
||
| // If we can read the data correctly, the test should fail | ||
| if (result != null && result.value().equals("value1") && result.timestamp() == (baseTime + 100)) { |
There was a problem hiding this comment.
It seems we have two code path that make the test pass? We reach this point, but "Fail" to verify the data, vs we crash with an Exception. Wondering why we need both? Would we not expect the test to either always crash with and exception (and passed), or always not crash but reach this code and just fail to verify?
If not, what is the race condition which influence which code path this test takes?
|
There is failing test -- was the change to |
|
I looked into this, and think the tests are actually incorrect -- but it was never detected because we swallow the exception... Did PR to change this: #21636 |
|
We can rebase this PR -- feel free to merge if CI passes after rebasing. |
# Conflicts: # streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java
283fec5 to
96aca35
Compare
…e#21581) This PR implements the upgrade integration tests for `TimestampedWindowStateStoreWithHeaders` introduced in KIP-1271. The class should be reviewd: `HeadersStoreUpgradeIntegrationTest` This should not be merged before apache#21497 Reviewers: Matthias J. Sax <matthias@confluent.io>, Alieh Saeedi <asaeedi@confluent.io>
…e#21581) This PR implements the upgrade integration tests for `TimestampedWindowStateStoreWithHeaders` introduced in KIP-1271. The class should be reviewd: `HeadersStoreUpgradeIntegrationTest` This should not be merged before apache#21497 Reviewers: Matthias J. Sax <matthias@confluent.io>, Alieh Saeedi <asaeedi@confluent.io>
This PR implements the upgrade integration tests for
TimestampedWindowStateStoreWithHeadersintroduced in KIP-1271.The class should be reviewd:
HeadersStoreUpgradeIntegrationTestThis should not be merged before #21497
Reviewers: Matthias J. Sax matthias@confluent.io, Alieh Saeedi
asaeedi@confluent.io