Skip to content

Failed to publish segments when all kafka records of previous task were filtered out by transformSpec #8765

@JasonChoi27

Description

@JasonChoi27

Affected Version

master

Description

Ingest Schema

{
  "type":"kafka",
  "dataSchema":{
      "dataSource":"growth_short_links",
      "parser":{
          ...
      },
      "metricsSpec":[
          ...
      ],
      "granularitySpec":{
          ...
      },
      "transformSpec":{
          "filter":{
              "type":"in",
              "dimension":"appId",
              "values":[
                  "5d30228e45a44304f83c8f94",
                  "5d3022a94b7215a8ca5f2f59",
                  "5d3022b14b7215a8ca5f2f5a"
              ]
          }
      }
  },
  "tuningConfig":{
       ...
  },
  "ioConfig":{
      ...
  }
}

Reproduce

  • From the beginning of the first task, make sure all records be filtered out from the condition at transformSpec and wait for the task duration over. The task will finish sucessfully.
  • After the second task be submitted, make sure some records do not be filtered out from the condition, and this task will fail before publishes segments with the exception as below.

Exception

2019-10-28T19:02:48,360 ERROR [publish-0] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner - Error while publishing segments for sequenceNumber[SequenceMetadata{sequenceId=0, sequenceName='index_kafka_growth_short_links_149a86e99f195dd_0', assignments=[], startOffsets={0=1997372995}, exclusiveStartPartitions=[], endOffsets={0=1997606975}, sentinel=false, checkpointed=true}]
org.apache.druid.java.util.common.ISE: Failed to publish segments because of [java.lang.RuntimeException: Aborting transaction!].
	at org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.lambda$publishInBackground$8(BaseAppenderatorDriver.java:605) ~[druid-server-0.16.0-incubating.jar:0.16.0-incubating]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]

The first task just checkpointed metadata and did not save it into db becasue no segment should be published. After the first task finished, the offset in db still be like 1995845112 and the partitionSequenceNumber of the second task was bigger than 1995845112 because the first one has ingested some records.

When the second task was ready to published segment, it sent a SegmentTransactionalInsertAction to overlord to save metadata into db. Overlord would firstly compare the metadata in db with this commit metadata, but unfortunately, it failed. Log at overlord like,

2019-10-28T19:02:48,264 ERROR [qtp577628476-146] org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator - Not updating metadata, existing state[KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamEndSequenceNumbers{stream='logs.creditcard.ofa.analytics.event', partitionSequenceNumberMap={0=1995845112}}}] in metadata store doesn't match to the new start state[KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='logs.creditcard.ofa.analytics.event', partitionSequenceNumberMap={0=1997372995}, exclusivePartitions=[]}}].

Of cause the task will be recovered by next submission, and starts ingesting data from the offset saved in db. But i think it would be better to make kafka offset promoted no matter the task has segment to publish or not.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions