Skip to content

KafkaSupervisor NPE in checkPendingCompletionTasks when a group times out #5900

@gianm

Description

@gianm

What happens is that task groups get moved from taskGroups to pendingCompletionTaskGroups in the checkTaskDuration() method. They are moved in the sense that they are deleted from taskGroups and added to pendingCompletionTaskGroups. Then, in the checkPendingCompletionTasks() method, tasks group in pendingCompletionTaskGroups are examined to see if they have completed or not. The method then does some stuff to the ones that time out, including a generateSequenceName call that is guaranteed to NPE, since it is called for a groupId that has been removed from taskGroups (and which it tries to retrieve from taskGroups).

The relevant code in pendingCompletionTaskGroups is:

        if ((!foundSuccess && group.completionTimeout.isBeforeNow()) || entireTaskGroupFailed) {
          if (entireTaskGroupFailed) {
            log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", groupId);
          } else {
            log.makeAlert(
                "No task in [%s] succeeded before the completion timeout elapsed [%s]!",
                group.taskIds(),
                ioConfig.getCompletionTimeout()
            ).emit();
          }

          // reset partitions offsets for this task group so that they will be re-read from metadata storage
          partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET);
          sequenceTaskGroup.remove(generateSequenceName(groupId));

          // kill all the tasks in this pending completion group
          killTasksInGroup(group);
          // set a flag so the other pending completion groups for this set of partitions will also stop
          stopTasksInTaskGroup = true;

          // kill all the tasks in the currently reading task group and remove the bad task group
          killTasksInGroup(taskGroups.remove(groupId));
          toRemove.add(group);
        }

And generateSequenceName:

  String generateSequenceName(int groupId)
  {
    return generateSequenceName(
        taskGroups.get(groupId).partitionOffsets,
        taskGroups.get(groupId).minimumMessageTime,
        taskGroups.get(groupId).maximumMessageTime
    );
  }

Related: #5656 and #5666, which experienced task timeouts due to a different bug (#5899) and then NPEd due to this bug.

This could be tested in KafkaSupervisorTest by adding a test that exercises the relevant code path in pendingCompletionTaskGroups. Something that simulates tasks never exiting after being told to finish.

/cc @surekhasaharan

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions