Logic adjustments to SeekableStreamIndexTaskRunner.#7267
Logic adjustments to SeekableStreamIndexTaskRunner.#7267clintropolis merged 3 commits intoapache:masterfrom
Conversation
|
Most of the bug fixes should only affect Kinesis, since they were in code that handled the possibility of inclusive end offsets, which the Kafka codepath doesn't use. I think the only Kafka-related issue fixed by this patch was the removal of the "continue" calls in the main read loop, which beforehand, could potentially have caused Kafka ingestion to get stuck. |
A mix of simplifications and bug fixes. They are intermingled because some of the bugs were made difficult to fix, and also more likely to happen in the first place, by how the code was structured. I tried to keep restructuring to a minimum. The changes are: - Remove "initialOffsetsSnapshot", which was used to determine when to skip start offsets. Replace it with "lastReadOffsets", which I hope is more intuitive. (There is a connection: start offsets must be skipped if and only if they have already been read, either by a previous task or by a previous sequence in the same task, post-restoring.) - Remove "isStartingSequenceOffsetsExclusive", because it should always be the opposite of isEndOffsetExclusive. The reason is that starts are exclusive exactly when the prior ends are inclusive: they must match up in that way for adjacent reads to link up properly. - Don't call "seekToStartingSequence" after the initial seek. There is no reason to, since we expect to read continuous message streams throughout the task. And calling it makes offset-tracking logic trickier, so better to avoid the need for trickiness. I believe the call being here was causing a bug in Kinesis ingestion where a message might get double-read. - Remove the "continue" calls in the main read loop. They are bad because they prevent keeping currOffsets and lastReadOffsets up to date, and prevent us from detecting that we have finished reading. - Rework "verifyInitialRecordAndSkipExclusivePartition" into "verifyRecordInRange". It no longer has side effects. It does a sanity check on the message offset and also makes sure that it is not past the endOffsets. - Rework "assignPartitions" to replace inline comparisons with "isRecordAlreadyRead" and "isMoreToReadBeforeReadingRecord" calls. I believe this fixes an off-by-one error with Kinesis where the last record would not get read. It also makes the logic easier to read. - When doing the final publish, only adjust end offsets of the final sequence, rather than potentially adjusting any unpublished sequence. Adjusting sequences other than the last one is a mistake since it will extend their endOffsets beyond what they actually read. (I'm not sure if this was an issue in practice, since I'm not sure if real world situations would have more than one unpublished sequence.) - Rename "isEndSequenceOffsetsExclusive" to "isEndOffsetExclusive". It's shorter and more clear, I think. - Add equals/hashCode/toString methods to OrderedSequenceNumber. Kafka test changes: - Added a Kafka "testRestoreAtEndOffset" test to verify that restores at the very end of the task lifecycle still work properly. Kinesis test changes: - Renamed "testRunOnNothing" to "testRunOnSingletonRange". I think that given Kinesis semantics, the right behavior when start offset equals end offset (and there aren't exclusive partitions set) is to read that single offset. This is because they are both meant to be treated as inclusive. - Adjusted "testRestoreAfterPersistingSequences" to expect one more message read. I believe the old test was wrong; it expected the task not to read message number 5. - Adjusted "testRunContextSequenceAheadOfStartingOffsets" to use a checkpoint starting from 1 rather than 2. I believe the old test was wrong here too; it was expecting the task to start reading from the checkpointed offset, but it actually should have started reading from one past the checkpointed offset. - Adjusted "testIncrementalHandOffReadsThroughEndOffsets" to expect 11 messages read instead of 12. It's starting at message 0 and reading up to 10, which should be 11 messages.
| if (!restoredNextPartitions.getStream().equals(ioConfig.getStartPartitions().getStream())) { | ||
| throw new ISE( | ||
| "WTF?! Restored stream[%s] but expected stream[%s]", | ||
| "WTF?! Restored topic[%s] but expected topic[%s]", |
There was a problem hiding this comment.
IIRC, the term stream was used intentionally in #6431 because the author thought it's a more generic term to represent both Kafka topic and Kinesis stream. This stream is used in other places in Druid too.
There was a problem hiding this comment.
OK, I'll revert these changes, but I do think it's better w/ Kafkaesque terminology (I agree w/ #7267 (comment)). Especially because "sequence" already means something else in the context of seekable stream tasks (SequenceMetadata, sequenceName, etc) and so it is best to avoid. But this can be driven separately and doesn't need to be looped into this logic adjustment PR.
| private final Map<PartitionIdType, SequenceOffsetType> endOffsets; | ||
|
|
||
| // lastReadOffsets are the last offsets that were read and processed. | ||
| private final ConcurrentMap<PartitionIdType, SequenceOffsetType> lastReadOffsets = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Why is this a ConcurrentHashMap?
There was a problem hiding this comment.
Good question. There is no reason. I changed it to a regular HashMap.
| private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = new ArrayList<>(); | ||
| private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new ArrayList<>(); | ||
| private final Set<PartitionIdType> initialOffsetsSnapshot = new HashSet<>(); | ||
| private final Set<PartitionIdType> exclusiveStartingPartitions = new HashSet<>(); |
There was a problem hiding this comment.
Would you please remove this? It's not used anymore.
| final SequenceOffsetType recordOffset | ||
| ) | ||
| { | ||
| // Check only for the first record among the record batch. |
There was a problem hiding this comment.
It looks that this isn't true anymore.
|
|
||
| log.trace( | ||
| "Got stream[%s] partition[%s] sequence[%s].", | ||
| "Got topic[%s] partition[%s] offset[%s], shouldProcess[%s].", |
There was a problem hiding this comment.
Same here. stream and sequence were used intentionally.
There was a problem hiding this comment.
I personally prefer offset over sequence because the former is more obviously a position to me, but am indifferent about whether stream or topic.
| sequenceMetadata.setEndOffsets(currOffsets); | ||
| sequenceMetadata.updateAssignments(this, currOffsets); | ||
| final boolean isLast = i == (sequences.size() - 1); | ||
| if (isLast) { |
There was a problem hiding this comment.
Does it make sense to add a sanity check that the endOffsets are properly set for non-last sequences?
| ) | ||
| ); | ||
|
|
||
| final ListenableFuture<TaskStatus> future2 = runTask(task2); |
There was a problem hiding this comment.
Would you please add a comment about why task2 reads nothing?
There was a problem hiding this comment.
The actual bug here was that if a task was given a 'bad' end offset that was a kafka transactional topic control offset instead of a record, and was right after the last read good offset, that the task would get stuck in an infinite read loop due to the continue statements in the loop that were removed in this PR. I think this test should either be removed since it shouldn't happen in practice, or be renamed to like testDoesntGetStuckWithTransactionOffset and maybe slightly reworked and commented to clear this up.
There was a problem hiding this comment.
I think this test should probably just be removed, since it's not testing a real scenario.
| true | ||
| ); | ||
| // Set end offsets to one past the checkpoint, simulating a replica that needs to catch up. | ||
| task.getRunner().setEndOffsets(ImmutableMap.of(shardId1, "10"), true); |
There was a problem hiding this comment.
FYI, I fixed this test to be more realistic in #7264.
There was a problem hiding this comment.
It looks like that PR has enough approvals to commit. I'll do that and merge it into this one.
clintropolis
left a comment
There was a problem hiding this comment.
Overall LGTM; it looks a lot clearer, thanks for doing this refactor 👍
I think we need to nail down the terminology a bit though, there now a bit more of a mix of offset, 'sequence number' and 'sequence offset'. Offset was never totally removed it appears, and SeekableStreamPartitions seems to support both terminology presumably for backwards compatibility, SequenceMetadata is using offset terminology probably for the same reason, I'm not quite sure what else is using what yet.
My vote is for 'offset' though.
|
|
||
| @Override | ||
| protected Long getSequenceNumberToStoreAfterRead(@NotNull Long sequenceNumber) | ||
| protected Long getNextStartOffset(@NotNull Long sequenceNumber) |
There was a problem hiding this comment.
:+1 on switching to 'offset', i think it's more intuitive terminology, though maybe change parameter variable name too?
There was a problem hiding this comment.
I decided to revert this for now, but plan to try again later.
| ) | ||
| ); | ||
|
|
||
| final ListenableFuture<TaskStatus> future2 = runTask(task2); |
There was a problem hiding this comment.
The actual bug here was that if a task was given a 'bad' end offset that was a kafka transactional topic control offset instead of a record, and was right after the last read good offset, that the task would get stuck in an infinite read loop due to the continue statements in the loop that were removed in this PR. I think this test should either be removed since it shouldn't happen in practice, or be renamed to like testDoesntGetStuckWithTransactionOffset and maybe slightly reworked and commented to clear this up.
|
|
||
| log.trace( | ||
| "Got stream[%s] partition[%s] sequence[%s].", | ||
| "Got topic[%s] partition[%s] offset[%s], shouldProcess[%s].", |
There was a problem hiding this comment.
I personally prefer offset over sequence because the former is more obviously a position to me, but am indifferent about whether stream or topic.
|
I think the best would be using different terms for kinesis and kafka. They are defining their own terminologies and this would be especially good for logging. |
I'm not sure what you mean... I think that would only be good for logging? I am mostly concerned about addressing what we call stuff in the code in the shared common structure to make it easy to follow and not switching what we call things all the time, and where I find offset to be more intuitive. I guess the implementors of |
|
I meant, there are people who prefer |
We should ensure that whatever terminology we want to use is correct now since this is being introduced with 0.14, at least things like json that escapes the source code, else we are going to have a bad time later. I agree that it's not worth blocking over renaming variables. |
|
I don't think its worth blocking a release over variable names. |
I agree, I wasn't thinking about variable names, I was talking about making sure we are happy with the things that end up in json that will be hard to change later once this is in the wild. I think it is maybe ok from what I've looked through so far. |
|
If we had to pick one set of terms, I would personally lean towards using Kafka-based terminology like "topic" and "offset" since I view Kafka as more "archetypal" than Kinesis. This PR doesn't change spec properties, so I think it's fine in that respect. |
|
I reverted the offset/topic naming changes. However, I also changed |
jihoonson
left a comment
There was a problem hiding this comment.
LGTM. It looks much easier to read. Thanks!
* Logic adjustments to SeekableStreamIndexTaskRunner. A mix of simplifications and bug fixes. They are intermingled because some of the bugs were made difficult to fix, and also more likely to happen in the first place, by how the code was structured. I tried to keep restructuring to a minimum. The changes are: - Remove "initialOffsetsSnapshot", which was used to determine when to skip start offsets. Replace it with "lastReadOffsets", which I hope is more intuitive. (There is a connection: start offsets must be skipped if and only if they have already been read, either by a previous task or by a previous sequence in the same task, post-restoring.) - Remove "isStartingSequenceOffsetsExclusive", because it should always be the opposite of isEndOffsetExclusive. The reason is that starts are exclusive exactly when the prior ends are inclusive: they must match up in that way for adjacent reads to link up properly. - Don't call "seekToStartingSequence" after the initial seek. There is no reason to, since we expect to read continuous message streams throughout the task. And calling it makes offset-tracking logic trickier, so better to avoid the need for trickiness. I believe the call being here was causing a bug in Kinesis ingestion where a message might get double-read. - Remove the "continue" calls in the main read loop. They are bad because they prevent keeping currOffsets and lastReadOffsets up to date, and prevent us from detecting that we have finished reading. - Rework "verifyInitialRecordAndSkipExclusivePartition" into "verifyRecordInRange". It no longer has side effects. It does a sanity check on the message offset and also makes sure that it is not past the endOffsets. - Rework "assignPartitions" to replace inline comparisons with "isRecordAlreadyRead" and "isMoreToReadBeforeReadingRecord" calls. I believe this fixes an off-by-one error with Kinesis where the last record would not get read. It also makes the logic easier to read. - When doing the final publish, only adjust end offsets of the final sequence, rather than potentially adjusting any unpublished sequence. Adjusting sequences other than the last one is a mistake since it will extend their endOffsets beyond what they actually read. (I'm not sure if this was an issue in practice, since I'm not sure if real world situations would have more than one unpublished sequence.) - Rename "isEndSequenceOffsetsExclusive" to "isEndOffsetExclusive". It's shorter and more clear, I think. - Add equals/hashCode/toString methods to OrderedSequenceNumber. Kafka test changes: - Added a Kafka "testRestoreAtEndOffset" test to verify that restores at the very end of the task lifecycle still work properly. Kinesis test changes: - Renamed "testRunOnNothing" to "testRunOnSingletonRange". I think that given Kinesis semantics, the right behavior when start offset equals end offset (and there aren't exclusive partitions set) is to read that single offset. This is because they are both meant to be treated as inclusive. - Adjusted "testRestoreAfterPersistingSequences" to expect one more message read. I believe the old test was wrong; it expected the task not to read message number 5. - Adjusted "testRunContextSequenceAheadOfStartingOffsets" to use a checkpoint starting from 1 rather than 2. I believe the old test was wrong here too; it was expecting the task to start reading from the checkpointed offset, but it actually should have started reading from one past the checkpointed offset. - Adjusted "testIncrementalHandOffReadsThroughEndOffsets" to expect 11 messages read instead of 12. It's starting at message 0 and reading up to 10, which should be 11 messages. * Changes from code review.
* Logic adjustments to SeekableStreamIndexTaskRunner. A mix of simplifications and bug fixes. They are intermingled because some of the bugs were made difficult to fix, and also more likely to happen in the first place, by how the code was structured. I tried to keep restructuring to a minimum. The changes are: - Remove "initialOffsetsSnapshot", which was used to determine when to skip start offsets. Replace it with "lastReadOffsets", which I hope is more intuitive. (There is a connection: start offsets must be skipped if and only if they have already been read, either by a previous task or by a previous sequence in the same task, post-restoring.) - Remove "isStartingSequenceOffsetsExclusive", because it should always be the opposite of isEndOffsetExclusive. The reason is that starts are exclusive exactly when the prior ends are inclusive: they must match up in that way for adjacent reads to link up properly. - Don't call "seekToStartingSequence" after the initial seek. There is no reason to, since we expect to read continuous message streams throughout the task. And calling it makes offset-tracking logic trickier, so better to avoid the need for trickiness. I believe the call being here was causing a bug in Kinesis ingestion where a message might get double-read. - Remove the "continue" calls in the main read loop. They are bad because they prevent keeping currOffsets and lastReadOffsets up to date, and prevent us from detecting that we have finished reading. - Rework "verifyInitialRecordAndSkipExclusivePartition" into "verifyRecordInRange". It no longer has side effects. It does a sanity check on the message offset and also makes sure that it is not past the endOffsets. - Rework "assignPartitions" to replace inline comparisons with "isRecordAlreadyRead" and "isMoreToReadBeforeReadingRecord" calls. I believe this fixes an off-by-one error with Kinesis where the last record would not get read. It also makes the logic easier to read. - When doing the final publish, only adjust end offsets of the final sequence, rather than potentially adjusting any unpublished sequence. Adjusting sequences other than the last one is a mistake since it will extend their endOffsets beyond what they actually read. (I'm not sure if this was an issue in practice, since I'm not sure if real world situations would have more than one unpublished sequence.) - Rename "isEndSequenceOffsetsExclusive" to "isEndOffsetExclusive". It's shorter and more clear, I think. - Add equals/hashCode/toString methods to OrderedSequenceNumber. Kafka test changes: - Added a Kafka "testRestoreAtEndOffset" test to verify that restores at the very end of the task lifecycle still work properly. Kinesis test changes: - Renamed "testRunOnNothing" to "testRunOnSingletonRange". I think that given Kinesis semantics, the right behavior when start offset equals end offset (and there aren't exclusive partitions set) is to read that single offset. This is because they are both meant to be treated as inclusive. - Adjusted "testRestoreAfterPersistingSequences" to expect one more message read. I believe the old test was wrong; it expected the task not to read message number 5. - Adjusted "testRunContextSequenceAheadOfStartingOffsets" to use a checkpoint starting from 1 rather than 2. I believe the old test was wrong here too; it was expecting the task to start reading from the checkpointed offset, but it actually should have started reading from one past the checkpointed offset. - Adjusted "testIncrementalHandOffReadsThroughEndOffsets" to expect 11 messages read instead of 12. It's starting at message 0 and reading up to 10, which should be 11 messages. * Changes from code review.
A mix of simplifications and bug fixes. They are intermingled because
some of the bugs were made difficult to fix, and also more likely to
happen in the first place, by how the code was structured. I tried to
keep restructuring to a minimum. The changes are:
skip start offsets. Replace it with "lastReadOffsets", which I hope
is more intuitive. (There is a connection: start offsets must be
skipped if and only if they have already been read, either by a
previous task or by a previous sequence in the same task, post-restoring.)
be the opposite of isEndOffsetExclusive. The reason is that starts are
exclusive exactly when the prior ends are inclusive: they must match
up in that way for adjacent reads to link up properly.
no reason to, since we expect to read continuous message streams
throughout the task. And calling it makes offset-tracking logic
trickier, so better to avoid the need for trickiness. I believe the
call being here was causing a bug in Kinesis ingestion where a
message might get double-read.
because they prevent keeping currOffsets and lastReadOffsets up to
date, and prevent us from detecting that we have finished reading.
"verifyRecordInRange". It no longer has side effects. It does a sanity
check on the message offset and also makes sure that it is not past
the endOffsets.
"isRecordAlreadyRead" and "isMoreToReadBeforeReadingRecord" calls. I
believe this fixes an off-by-one error with Kinesis where the last
record would not get read. It also makes the logic easier to read.
sequence, rather than potentially adjusting any unpublished sequence.
Adjusting sequences other than the last one is a mistake since it
will extend their endOffsets beyond what they actually read. (I'm not
sure if this was an issue in practice, since I'm not sure if real
world situations would have more than one unpublished sequence.)
shorter and more clear, I think.
Kafka test changes:
the very end of the task lifecycle still work properly.
Kinesis test changes:
given Kinesis semantics, the right behavior when start offset equals
end offset (and there aren't exclusive partitions set) is to read that
single offset. This is because they are both meant to be treated as
inclusive.
message read. I believe the old test was wrong; it expected the task
not to read message number 5.
checkpoint starting from 1 rather than 2. I believe the old test was
wrong here too; it was expecting the task to start reading from the
checkpointed offset, but it actually should have started reading from
one past the checkpointed offset.
11 messages read instead of 12. It's starting at message 0 and reading
up to 10, which should be 11 messages.