We occasionally encounter KafkaIndexTask failed with the following stack trace
2018-12-19T18:25:30,062 ERROR [publish-driver] io.druid.indexing.kafka.KafkaIndexTask - Error in publish thread, dying: {class=io.druid.indexing.kafka.KafkaIndexTask, exceptionType=class java.lang.IllegalArgumentException, exceptionMessage=fromIndex(0) > toIndex(-1)}
java.lang.IllegalArgumentException: fromIndex(0) > toIndex(-1)
at java.util.ArrayList.subListRangeCheck(ArrayList.java:1006) ~[?:1.8.0_112]
at java.util.ArrayList.subList(ArrayList.java:996) ~[?:1.8.0_112]
at io.druid.segment.realtime.appenderator.AppenderatorImpl.persistAll(AppenderatorImpl.java:408) ~[druid-server-0.12.1.jar:0.12.1]
at io.druid.segment.realtime.appenderator.AppenderatorImpl.push(AppenderatorImpl.java:519) ~[druid-server-0.12.1.jar:0.12.1]
at io.druid.segment.realtime.appenderator.BaseAppenderatorDriver.pushInBackground(BaseAppenderatorDriver.java:351) ~[druid-server-0.12.1.jar:0.12.1]
at io.druid.segment.realtime.appenderator.StreamAppenderatorDriver.publish(StreamAppenderatorDriver.java:268) ~[druid-server-0.12.1.jar:0.12.1]
at io.druid.indexing.kafka.KafkaIndexTask.lambda$createAndStartPublishExecutor$28(KafkaIndexTask.java:364) ~[?:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_112]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_112]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_112]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_112]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
I believe it's caused by the following race condition
- task-runner-thread creates a new sink in getOrCreateSink(), but before it adds any row to the index
- publish-thread calls persistAll(). For the newly created sink, it doesn't contain any row yet so hydrants.size() == 0, which leads to IAE in the following code
// related code in persistAll
final List<FireHydrant> hydrants = Lists.newArrayList(sink); // hydrants.size() == 0
final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); // limit == -1
for (FireHydrant hydrant : hydrants.subList(0, limit)) { // IAE
#6454 reports a similar race, but I don't think it's a bug in AppenderatorImpl. The javadoc of Appenderator#add says
* If committer is not provided, no metadata is persisted. If it's provided, {@link #add}, {@link #clear},
* {@link #persistAll}, and {@link #push} methods should all be called from the same thread to keep the metadata
* committed by Committer in sync.
However KafkaIndexTask calls add and persistAll in different threads and thus violates the contract. So I think the right direction would be fix the problem in KafkaIndexTask and adds same thread check in AppenderatorImpl.
We occasionally encounter KafkaIndexTask failed with the following stack trace
I believe it's caused by the following race condition
#6454 reports a similar race, but I don't think it's a bug in AppenderatorImpl. The javadoc of Appenderator#add says
However KafkaIndexTask calls add and persistAll in different threads and thus violates the contract. So I think the right direction would be fix the problem in KafkaIndexTask and adds same thread check in AppenderatorImpl.