Fix record validation in SeekableStreamIndexTaskRunner#7246
Fix record validation in SeekableStreamIndexTaskRunner#7246fjy merged 2 commits intoapache:masterfrom
Conversation
|
Oops, I noticed that the unit test for kinesis indexing is missing. I'll add it. |
|
Added a test for Kinesis index task. |
| // check exclusive starting sequence | ||
| if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) { | ||
| log.info("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId()); | ||
| log.warn("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId()); |
There was a problem hiding this comment.
I don't think this needs to be a warning. It looks like it happens by design in Kinesis for any task after the first one that first reads a particular partition.
There was a problem hiding this comment.
Hmm, I don't remember why I changed this.. Will revert.
| // Check only for the first record among the record batch. | ||
| if (initialOffsetsSnapshot.contains(record.getPartitionId())) { | ||
| final SequenceOffsetType currOffset = currOffsets.get(record.getPartitionId()); | ||
| if (currOffset != null) { |
There was a problem hiding this comment.
When is currOffset null? It seems to defeat the purpose of this check if we can get a record to check, and then don't check it because we don't know what the current offset is supposed to be.
There was a problem hiding this comment.
Hmm, maybe it's better to throw an error if it's null. Will raise a PR.
| ) | ||
| { | ||
| if (intialSequenceSnapshot.containsKey(record.getPartitionId())) { | ||
| if (record.getSequenceNumber().compareTo(intialSequenceSnapshot.get(record.getPartitionId())) < 0) { |
There was a problem hiding this comment.
What was the issue with using intialSequenceSnapshot in the original code? Did it have the wrong offsets for some reason (like, later offsets than we should be reading)?
There was a problem hiding this comment.
I think checking against intialSequenceSnapshot is wrong. Before this PR, intialSequenceSnapshot contained the start offsets of the current sequence. Comparing the offsets of the read record with intialSequenceSnapshot means that it would allow rewinding if the rewound offsets are still larger than intialSequenceSnapshot which I don't think it should be allowed.
The bug reported in #7239 happens while checkpointing with multiple replicas. During the checkpoint, the supervisor pauses all replica tasks and finds the max offsets of the current sequence, S. And then, it sets the max offsets to end offsets for all replicas. Here, if finish = false in setEndOffsets(), intialSequenceSnapshot was updated to the given end offsets which is the start offsets of the next sequence, S'. However, each replica can still consume some more offsets of the sequence S after being resumed until it reaches to the end offsets of S. This incurred an exception at here because the offset of the record is for the sequence S which should be smaller than start offsets of S'.
There was a problem hiding this comment.
Here, if finish = false in setEndOffsets(), intialSequenceSnapshot was updated to the given end offsets which is the start offsets of the next sequence, S'. However, each replica can still consume some more offsets of the sequence S after being resumed until it reaches to the end offsets of S. This incurred an exception at here because the offset of the record is for the sequence S which should be smaller than start offsets of S'.
It sounds like this part is the heart of the bug: the code didn't allow for continuing to read a few more messages of a prior sequence S before the messages for a new sequence S' started showing up. And it sounds like the fix is to compare against the currOffsets we think we should be reading right now, rather than the start of the sequence. Thanks for explaining.
* Fix record validation in SeekableStreamIndexTaskRunner * add kinesis test
Fix #7239.
In
verifyInitialRecordAndSkipExclusivePartition(),currOffsetis now used to verify the record offset. I also fixed a bug which uses the defaultcompareTomethod ofSequenceOffsetType.OrderedSequenceNumbershould be used instead. I also changedSequenceOffsetTypeto not extendComparableto prevent this problem in the future.