Fix NPE while handling CheckpointNotice in KafkaSupervisor#5996
Fix NPE while handling CheckpointNotice in KafkaSupervisor#5996jon-wei merged 7 commits intoapache:masterfrom
Conversation
| @Nullable DataSourceMetadata previousCheckPoint, | ||
| @Nullable DataSourceMetadata currentCheckPoint | ||
| ) | ||
| public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) |
There was a problem hiding this comment.
@asdf2014 thanks for the review! Maybe you saw an old commit. I've resolved all code style issues.
There was a problem hiding this comment.
Um.. I am sure. It still exists in the latest version code. 😅
public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint)should be
public void checkpoint(
int taskGroupId,
DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint
)There was a problem hiding this comment.
I should see the latest version code when i visit https://github.com/apache/incubator-druid/pull/5996/files this page, right?
There was a problem hiding this comment.
Yes, you're seeing the latest one. That line doesn't exceed 120 columns.
| taskGroup | ||
| ) == null) { | ||
| sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(taskGroupId)); | ||
| log.info("Created new task group [%d]", taskGroupId); |
There was a problem hiding this comment.
Looks like this removes the logging event for new task groups, can we preserve that?
There was a problem hiding this comment.
Sounds good. I realized this log can be useful to debug supervisor's behavior for a specific taskGroupId.
|
👍 |
* Fix NPE while handling CheckpointNotice * fix code style * Fix test * fix test * add a log for creating a new taskGroup * fix backward compatibility in KafkaIOConfig
…pache#5996)" This reverts commit abe837a.
Fixes #5992.
In this PR, the supervisor has been changed to ignore inactive taskGroup (which is not in
taskGroupsmap) while processing a checkpointNotice if it's a known taskGroup (which means it is in eitherpendingCompletionTaskGroupsorpartitionGroupsmaps). SeeCheckpointNotice. isValidTaskGroup().Additionally,
sequenceTaskGroupmap fromKafkaSupervisor. InKafkaSupervisor, there is aConcurrentHashMapcalledtaskGroupswhich maps a taskGroupId to an active taskGroup. This is very similar tosequenceTaskGroupwhich maps a baseSequenceName to an active taskGroup because baseSequenceName is unique for a taskGroup. Also,sequenceTaskGroupwas being changed along withtaskGroups. We don't have to maintain twoConcurrentHashMaps which can even cause a race condition.sequenceTaskGrouphas been removed, the checkpoint request now contains a taskGroupId which is assigned toKafkaIndexTasks viaKafkaIOConfig.Nullableannotation inSupervisor.checkpoint()method.map.putIfAbsent()&map.get()pattern tomap.computeIfAbsent().