Skip to content

Race condition when killing kafka ingestion tasks with replication > 1 #13705

@IgorBerman

Description

@IgorBerman

There is race condition when kafka ingestion tasks finishing. When first of them finishes successfully the second should be stopped. In some cases the second one would be killed and task marked as failed, despite the fact the first one is finished successfully.
This might create false positive alerts when e.g. monitoring on failed tasks

Affected Version

24.0.1

Description

Hi All
I think there is some race condition between 2 threads that finishing second task in Task group while first task finished successfully
a bit of context: we using 24.0.1, ingesting from kafka topic with 2 replicas. Using mysql as metadatastore.

druid.coordinator.asOverlord.enabled=true
druid.indexer.runner.type=remote

here is some log extraction from coordinator(and overlord) log with my comments(the relevant parts will be attached in thread)
index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_fnoejlpn finished successfully ->
which triggered stopping all tasks in group(another one is index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn
Stopping all tasks in taskGroup[5] because: [Task [index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_fnoejlpn] completed successfully, stopping tasks [index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn, index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_fnoejlpn]]"
and it seems that it also finished:

2023-01-15T11:36:01,679 INFO [Curator-PathChildrenCache-1] org.apache.druid.indexing.overlord.RemoteTaskRunner - Worker[taz-comp00674.taboolasyndication.com:8091] completed task[index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn] with status[SUCCESS]
2023-01-15T11:36:01,679 INFO [Curator-PathChildrenCache-1] org.apache.druid.indexing.overlord.RemoteTaskRunner - Worker[taz-comp00674.taboolasyndication.com:8091] wrote SUCCESS status for task [index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn] on [TaskLocation{host='taz-comp00674.taboolasyndication.com', port=8102, tlsPort=-1}]
2023-01-15T11:36:01,679 INFO [Curator-PathChildrenCache-1] org.apache.druid.indexing.overlord.TaskQueue - Received SUCCESS status for task: index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn

but then:
2023-01-15T11:36:01,680 INFO [IndexTaskClient-sp_campaigns_realtime_aggregation-3] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - Task [index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn] failed to stop in a timely manner, killing task
and then there is some race between 2 threads(Curator-PathChildrenCache-1 and IndexTaskClient-sp_campaigns_realtime_aggregation-3 )

2023-01-15T11:36:01,684 INFO [Curator-PathChildrenCache-1] org.apache.druid.indexing.overlord.MetadataTaskStorage - Updating task index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn to status: TaskStatus{id=index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn, status=SUCCESS, duration=3727754, errorMsg=null}
2023-01-15T11:36:01,685 INFO [IndexTaskClient-sp_campaigns_realtime_aggregation-3] org.apache.druid.indexing.overlord.MetadataTaskStorage - Updating task index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn to status: TaskStatus{id=index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn, status=FAILED, duration=-1, errorMsg=Task [index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn] failed to stop in a ti...}

Looking at logs of the index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn task it finished before all those
when coordinator printed
2023-01-15T11:34:00,802 INFO [IndexTaskClient-sp_campaigns_realtime_aggregation-3] org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient - Task [index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn] paused successfully
in it’s log I see
2023-01-15T11:36:00,754 INFO [main] org.apache.druid.java.util.common.lifecycle.Lifecycle - Stopping lifecycle [module] stage [INIT]
Finished peon task
index_kafka_sp_campaigns_realtime_aggregation_90e0b11d6585a1a_mhbgoepn.log.txt
coordinator.log.txt

looking at code where the ERROR printed “failed to stop in a timely manner, killing task” I’ve traced to call of this function
https://github.com/apache/druid/blob/cc89c661d0aebf95bb85f01bfbedf76251e2f307/inde[…]xing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java
which seems to return false, either when there is http error or when it’s notAvailable(not sure what it means)
anyway I think in case where task already finished, it can’t respond to http requests…so it’s not surprising that it’s ‘failed’ to stop it
I’m thinking there is some missing synchronisation piece since the task should have already marked as ‘done’/success when it was paused at first place, or alternatively the race between 2 threads above should be solved in some way?
Any thoughts?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions