Skip to content

Kafka tasks fail after resuming for incremental handoff #7239

@jihoonson

Description

@jihoonson

Affected Version

0.14.0 (This bug was introduced in #6431).

Description

Here is the stack trace.

2019-03-12T02:59:51,878 INFO [appenderator_persist_0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Persist completed with metadata [AppenderatorDriverMetadata{segments={index_kafka_clarity-cloud0_d756771c2863b20_0=[SegmentWithState{segmentIdentifier=clarity-cloud0_2019-03-12T02:00:00.000Z_2019-03-12T03:00:00.000Z_2019-03-12T01:56:11.119Z_10, state=APPENDING}]}, lastSegmentIds={index_kafka_clarity-cloud0_d756771c2863b20_0=clarity-cloud0_2019-03-12T02:00:00.000Z_2019-03-12T03:00:00.000Z_2019-03-12T01:56:11.119Z_10}, callerMetadata={nextPartitions=SeekableStreamPartitions{stream/topic='clarity-cloud0', partitionSequenceNumberMap/partitionOffsetMap={1=103431503737, 4=6100519619, 7=6100600785, 10=6098083460, 13=6101049368, 16=6101000251, 19=6097385400, 22=6100601015, 25=6101101786, 28=6101055354}}}}]
2019-03-12T02:59:52,005 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Pausing ingestion until resumed
2019-03-12T02:59:52,070 INFO [qtp606061176-123] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Persisting Sequences Metadata [[SequenceMetadata{sequenceName='index_kafka_clarity-cloud0_d756771c2863b20_0', sequenceId=0, startOffsets={16=6100335393, 1=103430839175, 19=6096726535, 4=6099861583, 22=6099935437, 7=6099935104, 25=6100439713, 10=6097421649, 28=6100393134, 13=6100392241}, endOffsets={16=6101282427, 1=103431786274, 19=6097668960, 4=6100807623, 22=6100884284, 7=6100884039, 25=6101382663, 10=6098354147, 28=6101340178, 13=6101339869}, assignments=[16, 1, 19, 4, 22, 7, 25, 10, 28, 13], sentinel=false, checkpointed=true}, SequenceMetadata{sequenceName='index_kafka_clarity-cloud0_d756771c2863b20_1', sequenceId=1, startOffsets={16=6101282427, 1=103431786274, 19=6097668960, 4=6100807623, 22=6100884284, 7=6100884039, 25=6101382663, 10=6098354147, 28=6101340178, 13=6101339869}, endOffsets={16=9223372036854775807, 1=9223372036854775807, 19=9223372036854775807, 4=9223372036854775807, 22=9223372036854775807, 7=9223372036854775807, 25=9223372036854775807, 10=9223372036854775807, 28=9223372036854775807, 13=9223372036854775807}, assignments=[16, 1, 19, 4, 22, 7, 25, 10, 28, 13], sentinel=false, checkpointed=false}]]
2019-03-12T02:59:52,086 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Ingestion loop resumed
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[25] to sequence[6101110684].
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[10] to sequence[6098092421].
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[4] to sequence[6100527087].
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[7] to sequence[6100609294].
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[22] to sequence[6100609476].
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[1] to sequence[103431512339].
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[16] to sequence[6101007019].
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[19] to sequence[6097392961].
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[13] to sequence[6101057094].
2019-03-12T02:59:52,087 INFO [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Seeking partition[28] to sequence[6101057589].
2019-03-12T02:59:52,091 ERROR [task-runner-0-priority-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Encountered exception in run() before persisting.
org.apache.druid.java.util.common.ISE: Starting sequenceNumber [6101007019] does not match expected [6101282427] for partition [16]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.verifyInitialRecordAndSkipExclusivePartition(SeekableStreamIndexTaskRunner.java:1895) ~[druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:517) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:246) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
	at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:166) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
	at org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391) [druid-indexing-service-0.14.0-iap-pre3.jar:0.14.0-iap-pre3]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_163]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_163]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_163]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_163]

This happens only when setEndOffsets() is called with finish = false which is missing in our unit tests. If finish is false, initialOffsetsSnapshot is updated to the start offsets of the new sequence in setEndOffsets(). However, if there are remaining offsets for a task to consume before starting a new sequence, verifyInitialRecordAndSkipExclusivePartition() can throw an error because the offset of the read record can be smaller than that in intialSequenceSnapshot:

  private boolean verifyInitialRecordAndSkipExclusivePartition(
      final OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType> record,
      final Map<PartitionIdType, SequenceOffsetType> intialSequenceSnapshot
  )
  {
    if (intialSequenceSnapshot.containsKey(record.getPartitionId())) {
      if (record.getSequenceNumber().compareTo(intialSequenceSnapshot.get(record.getPartitionId())) < 0) {
        throw new ISE(
            "Starting sequenceNumber [%s] does not match expected [%s] for partition [%s]",
            record.getSequenceNumber(),
            intialSequenceSnapshot.get(record.getPartitionId()),
            record.getPartitionId()
        );
      }

This error is reproducible in KafkaIndexTaskTest.testIncrementalHandOffReadsThroughEndOffsets() if you set finish to false here.

I'll raise a PR to fix this bug soon.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions