Skip to content

Fix KafkaRecordSupplier assign#7260

Merged
asdf2014 merged 2 commits intoapache:masterfrom
jon-wei:fix_assign
Mar 14, 2019
Merged

Fix KafkaRecordSupplier assign#7260
asdf2014 merged 2 commits intoapache:masterfrom
jon-wei:fix_assign

Conversation

@jon-wei
Copy link
Copy Markdown
Contributor

@jon-wei jon-wei commented Mar 13, 2019

KafkaRecordSupplier.assign() has an extraneous call to seekToEarliest(), which can result in situations where a record < startOffset is passed to the task.

If that call is not removed, the unit test added in this PR will fail with:

org.apache.druid.java.util.common.ISE: sequenceNumber of the start record[0] is smaller than current sequenceNumber[1] for partition[1]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.verifyInitialRecordAndSkipExclusivePartition(SeekableStreamIndexTaskRunner.java:1907) ~[classes/:?]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:516) ~[classes/:?]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:246) ~[classes/:?]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:167) ~[classes/:?]
	at org.apache.druid.indexing.kafka.KafkaIndexTaskTest.lambda$runTask$0(KafkaIndexTaskTest.java:2325) ~[test-classes/:?]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_162]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_162]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_162]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_162]

I believe the extra seekToEarliest call can also cause the issue seen in the following task log snippet:

2019-03-11T12:21:44,246 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Finished reading stream[clarity-cloud0], partition[28].
2019-03-11T12:21:44,247 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Finished reading stream[clarity-cloud0], partition[10].
2019-03-11T12:21:44,255 INFO [task-runner-0-priority-0] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=kafka-supervisor-ocjkminb] Resetting offset for partition clarity-cloud0-13 to offset 5934551751.
2019-03-11T12:21:44,266 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception in run() before persisting.
org.apache.druid.java.util.common.ISE: WTH?! cannot find any valid sequence for record with partition [13] and sequence [5934551751]. Current sequences: [SequenceMetadata{sequenceName='index_kafka_clarity-cloud0_ae14d839866dbea_6', sequenceId=6, startOffsets={16=6054920329, 1=103385424111, 19=6051311469, 4=6054446518, 22=6054520373, 7=6054520040, 25=6055024649, 10=6052006585, 28=6054978070, 13=6054977177}, endOffsets={16=6056265209, 1=103386768990, 19=6052656349, 4=6055791397, 22=6055865252, 7=6055864920, 25=6056369528, 10=6053351464, 28=6056322949, 13=6056322056}, assignments=[], sentinel=false, checkpointed=true}, SequenceMetadata{sequenceName='index_kafka_clarity-cloud0_ae14d839866dbea_7', sequenceId=7, startOffsets={16=6056265209, 1=103386768990, 19=6052656349, 4=6055791397, 22=6055865252, 7=6055864920, 25=6056369528, 10=6053351464, 28=6056322949, 13=6056322056}, endOffsets={16=6056430296, 1=103386934078, 19=6052821435, 4=6055956484, 22=6056030339, 7=6056030007, 25=6056534616, 10=6053516552, 28=6056488036, 13=6056487143}, assignments=[13], sentinel=false, checkpointed=true}]
    at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:557) [druid-indexing-service-0.14.0-iap-pre2.jar:0.14.0-iap-pre2]
    at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:246) [druid-indexing-service-0.14.0-iap-pre2.jar:0.14.0-iap-pre2]
    at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:166) [druid-indexing-service-0.14.0-iap-pre2.jar:0.14.0-iap-pre2]
    at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.14.0-iap-pre2.jar:0.14.0-iap-pre2]
    at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.14.0-iap-pre2.jar:0.14.0-iap-pre2]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_152]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_152]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_152]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_152]

@fjy
Copy link
Copy Markdown
Contributor

fjy commented Mar 14, 2019

@jon-wei I think thi sis failing UT

@jon-wei
Copy link
Copy Markdown
Contributor Author

jon-wei commented Mar 14, 2019

@fjy Ah, I was making some intermediate changes to the PR, it should be good now.

@jon-wei jon-wei removed the WIP label Mar 14, 2019
Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. There does not seem to be a reason to seek to the earliest offsets at that point in the code.

Copy link
Copy Markdown
Member

@asdf2014 asdf2014 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

@asdf2014 asdf2014 merged commit c020272 into apache:master Mar 14, 2019
clintropolis pushed a commit to clintropolis/druid that referenced this pull request Mar 14, 2019
* Fix KafkaRecordSupplier assign

* TeamCity fix
gianm pushed a commit to implydata/druid-public that referenced this pull request Mar 14, 2019
* Fix KafkaRecordSupplier assign

* TeamCity fix
jon-wei pushed a commit that referenced this pull request Mar 15, 2019
* Fix KafkaRecordSupplier assign

* TeamCity fix
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants