KAFKA-4863: Querying window store may return unwanted keys#2662
KAFKA-4863: Querying window store may return unwanted keys#2662dguy wants to merge 2 commits intoapache:trunkfrom
Conversation
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
@dguy thanks for the patch. I can't immediately tell which change fixed the problem since there is a lot of refactoring in there. Any chance you could point me at the change? It'd be super useful (for me) to perhaps have 2 commits for such PRs, one that shows the immediate fix and another for the refactoring (although this is in theory, in practice it might be a pain). Thanks. |
|
Sorry @enothereska, the key part of the change is in |
| if (iterator.hasNext()) { | ||
| final Bytes bytes = iterator.peekNextKey(); | ||
| final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get()); | ||
| return keyBytes.equals(binaryKey); |
There was a problem hiding this comment.
Should we also check that
final long start = SessionKeySerde.extractStart(bytes.get());
final long end = SessionKeySerde.extractEnd(bytes.get());
return end >= from && start <= to;
Although the current impl of RocksDBWIndowStore naturally checked that for us, it does not guarantee all underlying store impls guarantee that.
There was a problem hiding this comment.
Yeah makes sense to add the same check to the WindowKeySchema
| assertEquals(Utils.mkSet("five@5"), entriesByKey.get(5)); | ||
| assertNull(entriesByKey.get(6)); | ||
|
|
||
| windowStore.close(); |
There was a problem hiding this comment.
Is this still needed?
|
|
||
| private final File baseDir = TestUtils.tempDirectory("test"); | ||
| private final MockProcessorContext context = new MockProcessorContext(baseDir, Serdes.ByteArray(), Serdes.ByteArray(), recordCollector, cache); | ||
| private WindowStore windowStore; |
There was a problem hiding this comment.
Seems all test cases are creating the store with the same key-value type: <Integer, String>, the difference only lies in the constructor parameters. In this case could we still declare it as typed and hence remove the @SuppressWarnings("unchecked")?
There was a problem hiding this comment.
Not all tests. There is at least one that uses <String, Integer>, hence the change
There was a problem hiding this comment.
Could you point me to the constructor of <String, Integer>? Can't found it in this file..
There was a problem hiding this comment.
It is actually <String, String>, line 622
There was a problem hiding this comment.
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Merged to trunk. @dguy Could you file another PR against 0.10.2 branch? I tried to cherry-pick this one but found it too hard to resolve conflicts. |
|
@guozhangwang sure - no probs |
| if (iterator.hasNext()) { | ||
| final Bytes bytes = iterator.peekNextKey(); | ||
| final Bytes keyBytes = WindowStoreUtils.bytesKeyFromBinaryKey(bytes.get()); | ||
| if (!keyBytes.equals(binaryKey)) { |
There was a problem hiding this comment.
@guozhangwang @dguy
we cannot guarantee that all the entries for one key will necessarily precede the entries for the next key.
The following code still fails with this patch, and only returns 0001 and 0003, since the key for ("a", "0005") will come after the key for ("aa", "0004")
final RocksDBWindowStoreSupplier<String, String> supplier =
new RocksDBWindowStoreSupplier<>(
"window",
0x7a00000000000000L, 2,
true,
Serdes.String(),
Serdes.String(),
0x7a00000000000000L,
true,
Collections.<String, String>emptyMap(),
false);
windowStore = supplier.get();
windowStore.init(context, windowStore);
windowStore.put("a", "0001", 0);
windowStore.put("aa", "0002", 0);
windowStore.put("a", "0003", 1);
windowStore.put("aa", "0004", 1);
windowStore.put("a", "0005", 0x7a00000000000000L - 1);
final List expected = Utils.mkList("0001", "0003", "0005");
assertThat(toList(windowStore.fetch("a", 0, Long.MAX_VALUE)), equalTo(expected));
There was a problem hiding this comment.
@xvrl Is this the same as this unit test we added here? https://github.com/apache/kafka/pull/2662/files#diff-b28e9353a9574fb2b6a33cfa6e560857R200
There was a problem hiding this comment.
Note that WindowStore.fetch() should return WindowStoreIterator where the key is Long indicating timestamp and value is the value.
There was a problem hiding this comment.
This is a slightly different test. The window is much larger to ensure all the entries are in the same window and the last timestamp for (a, 0005) is such that it causes the resuling composite key to come after the one for (aa, 0004)
There was a problem hiding this comment.
I see, it is because the binary key becomes:
ay\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x00\x00\x00\x05
which is greater than the previous one:
aa\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x04
So the lexicographic comparator will return aa\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x04 first, hence we stop iterating
Make sure that the iterator returned from
WindowStore.fetch(..)only returns matching keys, rather than all keys that are a prefix match.