KAFKA-13211: add support for infinite range query for WindowStore#11227
KAFKA-13211: add support for infinite range query for WindowStore#11227guozhangwang merged 9 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
The null cacheKeyFrom and cacheKeyTo will use range query, which is already supported in KIP-763.
There was a problem hiding this comment.
side fix: the all case should be from == null && to == null. Otherwise, call range method, which is already supported null range query in KIP-763
There was a problem hiding this comment.
Isn't it sufficient to distinguish the forward and reverse cases and just call range(from, to) or reverseRange(from, to)?
There was a problem hiding this comment.
Good suggestion. Updated
|
@patrickstuedi @vvcephei , please take a look. Thank you. |
There was a problem hiding this comment.
Thanks for the work @showuon ! Just a few comments..
| files="StreamThread.java"/> | ||
|
|
||
| <suppress checks="BooleanExpressionComplexity" | ||
| files="InMemoryWindowStore.java"/> |
There was a problem hiding this comment.
I guess this is because of InMemoryWindowStore::isKeyWithinRange? Can we make that method more readable and at the same time avoid having to do this?
| * This iterator must be closed after use. | ||
| * | ||
| * @param keyFrom the first key in the range | ||
| * A null value indicates a starting position from the first element in the store. |
There was a problem hiding this comment.
Can you make those extra lines have the same indentation than the previous line so it can easily be seen that they belong together?
| } | ||
| } | ||
|
|
||
| private boolean isKeyWithinRange(final Bytes key) { |
There was a problem hiding this comment.
Per comment above, can you make this method more readable by splitting the statements?
| public void shouldThrowNullPointerExceptionOnRangeNullToKey() { | ||
| assertThrows(NullPointerException.class, () -> windowStore.fetch(1, null, ofEpochMilli(1L), ofEpochMilli(2L))); | ||
| } | ||
|
|
There was a problem hiding this comment.
Instead of deleting maybe you want to change the name and check that the the store returns the right values.
There was a problem hiding this comment.
Yes, but I've already tested these 2 test cases above (i.e. testFetchRange and testBackwardFetchRange). I don't think we should test them again here. What do you think?
There was a problem hiding this comment.
Ok, no, if you have them covered up there that's fine.
| iterator.next(), | ||
| new Windowed<>(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE)), | ||
| "a"); | ||
| assertFalse(iterator.hasNext()); |
There was a problem hiding this comment.
Did you try if you can consolidate that code into a common method? It seem it's the same verification for different value sets in each of the tests.
patrickstuedi
left a comment
There was a problem hiding this comment.
Just a few more comments..
There was a problem hiding this comment.
Isn't it sufficient to distinguish the forward and reverse cases and just call range(from, to) or reverseRange(from, to)?
|
@patrickstuedi , thanks for the comments. I've addressed all your comments and add test coverage. Please take a look again. Thank you. |
patrickstuedi
left a comment
There was a problem hiding this comment.
Thanks for the update!
One more thing, it might be worth adding a topologyDriver based tests and an integration test, to test all combinations of layered window stores. You can take a look at the two tests for the non-window stores:
-streams/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java
- streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
|
Integration tests added, but found a bug that will fail these tests. Will wait for the PR got merged and continue this PR. Thanks. #11292 |
|
@showuon Any new updates on this? |
|
@patrickstuedi , yes, the fix PR (#11292) is under reviewing (should be close). Thank you. |
e8d013f to
eb19d12
Compare
|
@patrickstuedi , the PR(#11292) is merged into trunk. I've rebased this PR, so it is good to review now. Thank you. |
patrickstuedi
left a comment
There was a problem hiding this comment.
Thanks for the work @showuon! Looking good to me.
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks @showuon the PR LGTM. The only meta comment I have in mind is the rationale to add a separate integration test on top of all the unit tests. Usually we have integration test in order to test the interaction between multiple modules, which would be more complicated (and more likely to become flaky due to timing issues), and takes more time to run. I feel that for this functionality just the augmented unit tests are sufficient, but I might be wrong so please let me know if you feel it does bring additional coverage.
| } else if (keyFrom == null && key.compareTo(getKey(keyTo)) <= 0) { | ||
| // start from the beginning | ||
| isKeyInRange = true; | ||
| } else if (key.compareTo(getKey(keyFrom)) >= 0 && keyTo == null) { |
There was a problem hiding this comment.
nit: let's move keyTo == null up first so that if it does not satisfy, we do not need to trigger getKey anymore.
There was a problem hiding this comment.
Think about that a bit more, maybe we can make it simpler as:
if (keyFrom == null && keyTo == null) {
// fetch all
return true;
} else if (keyFrom == null) {
// start from the beginning
return key.compareTo(getKey(keyTo)) <= 0;
} else if (keyTo == null) {
// end to the last
return key.compareTo(getKey(keyFrom)) >= 0;
} else {
return key.compareTo(getKey(keyFrom)) >= 0 && key.compareTo(getKey(keyTo)) <= 0;
}
| @SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. | ||
| @RunWith(Parameterized.class) | ||
| @Category({IntegrationTest.class}) | ||
| public class RangeQueryForWindowStoreIntegrationTest { |
There was a problem hiding this comment.
Could we refactor it into a unit test instead of an integration test? Does integration environment bring any additional coverage here?
There was a problem hiding this comment.
Also, what additional coverage does this test provide on top of the augmented unit tests (which is great, btw!) below?
There was a problem hiding this comment.
The reason we have an integration test here is because we use TopologyTestDriver in the unit test, without real brokers. But on second thought, I think we can remove the integration test, because that's why TopologyTestDriver exists, to simulate the streaming running. Especially this test case doesn't involve interaction between multiple modules. So I removed it now. Thank you.
|
LGTM! Merged to trunk. |
…ache#11227) Add support for infinite range query for WindowStore. Story JIRA: https://issues.apache.org/jira/browse/KAFKA-13210 Reviewers: Patrick Stuedi <pstuedi@gmail.com>, Guozhang Wang <wangguoz@gmail.com>
Add support for infinite range query for WindowStore.
Story JIRA: https://issues.apache.org/jira/browse/KAFKA-13210
Committer Checklist (excluded from commit message)