KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables#13522
KAFKA-14834: [5/N] Drop out-of-order records from FK join with versioned tables#13522mjsax merged 3 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
Extracting the common code from process() into this helper method is purely a refactor, unrelated to the changes in this PR. I had to make the refactor in order to avoid a CyclomaticComplexity checkstyle failure upon adding a clause into process() for dropping out-of-order records.
There was a problem hiding this comment.
I've added a new test class which extends the existing FK join integration test class, rather than adding leftVersioned and rightVersioned into the parameters matrix for the existing class directly, in order to keep the number of test cases manageable. The existing test class already runs 96 tests (16 parameter combinations for each of 6 tests). Adding leftVersioned and rightVersioned into the parameter matrix would multiply that number by another factor of 4. By adding a new test class instead, and not testing for topology optimizations or rejoins in this new class, this new class only adds 108 new tests (16 parameter combinations for each of 7 tests) instead of three times that many.
c789e25 to
8decdda
Compare
| right.pipeInput("rhs1", "rhsValue1"); | ||
| right.pipeInput("rhs2", "rhsValue2"); | ||
| right.pipeInput("rhs3", "rhsValue3"); // this unreferenced FK won't show up in any results | ||
| right.pipeInput("rhs1", "rhsValue1", baseTimestamp); |
There was a problem hiding this comment.
Should we use "auto advance" instead of passing in timestamps expliclity? Could also be an advantage to keep it explicit for readability?
There was a problem hiding this comment.
I'll leave it as is for readability, since I believe auto-advance would be per-topic and the test behavior is clearer this way since it's explicit that records from the two input topics do not overlap/interleave in their timestamps.
| ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs1,null)"), | ||
| mkEntry("lhs2", "(lhsValue2|rhs2,null)"), | ||
| mkEntry("lhs3", "(lhsValue3|rhs1,null)")) | ||
| : emptyMap() |
There was a problem hiding this comment.
nit: avoid unnecessary reformatting (seems to be an IDE setting that triggered?)
There was a problem hiding this comment.
Ack, sorry about that. I tried looking to see if I have an auto-reformat setting on my IDE but everything looks to be disabled. I'll be on the lookout for this in the future; hopefully it won't be an issue again.
| } | ||
|
|
||
| @Test | ||
| public void shouldIgnoreOutOfOrderRecordsIffVersioned() { |
There was a problem hiding this comment.
Iff -> If (or is it if and only if)?
There was a problem hiding this comment.
Ah yeah I meant for this to mean "if and only if." It does look like a typo at first glance haha.
This PR updates foreign-key table-table join processors to ignore out-of-order records from versioned tables, as specified in KIP-914.
Committer Checklist (excluded from commit message)