[Backport] Logic adjustments to SeekableStreamIndexTaskRunner.#7272
Merged
gianm merged 1 commit intoapache:0.14.0-incubatingfrom Mar 15, 2019
Merged
Conversation
* Logic adjustments to SeekableStreamIndexTaskRunner. A mix of simplifications and bug fixes. They are intermingled because some of the bugs were made difficult to fix, and also more likely to happen in the first place, by how the code was structured. I tried to keep restructuring to a minimum. The changes are: - Remove "initialOffsetsSnapshot", which was used to determine when to skip start offsets. Replace it with "lastReadOffsets", which I hope is more intuitive. (There is a connection: start offsets must be skipped if and only if they have already been read, either by a previous task or by a previous sequence in the same task, post-restoring.) - Remove "isStartingSequenceOffsetsExclusive", because it should always be the opposite of isEndOffsetExclusive. The reason is that starts are exclusive exactly when the prior ends are inclusive: they must match up in that way for adjacent reads to link up properly. - Don't call "seekToStartingSequence" after the initial seek. There is no reason to, since we expect to read continuous message streams throughout the task. And calling it makes offset-tracking logic trickier, so better to avoid the need for trickiness. I believe the call being here was causing a bug in Kinesis ingestion where a message might get double-read. - Remove the "continue" calls in the main read loop. They are bad because they prevent keeping currOffsets and lastReadOffsets up to date, and prevent us from detecting that we have finished reading. - Rework "verifyInitialRecordAndSkipExclusivePartition" into "verifyRecordInRange". It no longer has side effects. It does a sanity check on the message offset and also makes sure that it is not past the endOffsets. - Rework "assignPartitions" to replace inline comparisons with "isRecordAlreadyRead" and "isMoreToReadBeforeReadingRecord" calls. I believe this fixes an off-by-one error with Kinesis where the last record would not get read. It also makes the logic easier to read. - When doing the final publish, only adjust end offsets of the final sequence, rather than potentially adjusting any unpublished sequence. Adjusting sequences other than the last one is a mistake since it will extend their endOffsets beyond what they actually read. (I'm not sure if this was an issue in practice, since I'm not sure if real world situations would have more than one unpublished sequence.) - Rename "isEndSequenceOffsetsExclusive" to "isEndOffsetExclusive". It's shorter and more clear, I think. - Add equals/hashCode/toString methods to OrderedSequenceNumber. Kafka test changes: - Added a Kafka "testRestoreAtEndOffset" test to verify that restores at the very end of the task lifecycle still work properly. Kinesis test changes: - Renamed "testRunOnNothing" to "testRunOnSingletonRange". I think that given Kinesis semantics, the right behavior when start offset equals end offset (and there aren't exclusive partitions set) is to read that single offset. This is because they are both meant to be treated as inclusive. - Adjusted "testRestoreAfterPersistingSequences" to expect one more message read. I believe the old test was wrong; it expected the task not to read message number 5. - Adjusted "testRunContextSequenceAheadOfStartingOffsets" to use a checkpoint starting from 1 rather than 2. I believe the old test was wrong here too; it was expecting the task to start reading from the checkpointed offset, but it actually should have started reading from one past the checkpointed offset. - Adjusted "testIncrementalHandOffReadsThroughEndOffsets" to expect 11 messages read instead of 12. It's starting at message 0 and reading up to 10, which should be 11 messages. * Changes from code review.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Backport of #7267 to 0.14.0-incubating.