From c46d4681c334709caa5cddbd0ce0c67a7d22eaad Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 21 Aug 2018 15:39:00 -0700 Subject: [PATCH 1/3] Fix NPE in KafkaSupervisor.checkpointTaskGroup --- .../kafka/supervisor/KafkaSupervisor.java | 158 ++++++++++++------ .../kafka/supervisor/TaskReportData.java | 2 +- 2 files changed, 104 insertions(+), 56 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 8c9bb599ada4..ac7ba795e689 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -146,6 +146,8 @@ public class KafkaSupervisor implements Supervisor */ private class TaskGroup { + final int groupId; + // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in // this task group has completed successfully, at which point this will be destroyed and a new task group will be @@ -161,11 +163,13 @@ private class TaskGroup final String baseSequenceName; TaskGroup( + int groupId, ImmutableMap partitionOffsets, Optional minimumMessageTime, Optional maximumMessageTime ) { + this.groupId = groupId; this.partitionOffsets = partitionOffsets; this.minimumMessageTime = minimumMessageTime; this.maximumMessageTime = maximumMessageTime; @@ -187,9 +191,21 @@ Set taskIds() private static class TaskData { + @Nullable volatile TaskStatus status; + @Nullable volatile DateTime startTime; volatile Map currentOffsets = new HashMap<>(); + + @Override + public String toString() + { + return "TaskData{" + + "status=" + status + + ", startTime=" + startTime + + ", currentOffsets=" + currentOffsets + + '}'; + } } // Map<{group ID}, {actively reading task group}>; see documentation for TaskGroup class @@ -718,8 +734,8 @@ public void handle() throws ExecutionException, InterruptedException log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); return; } - final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); - taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); + final Map newCheckpoint = checkpointTaskGroup(taskGroup, false).get(); + taskGroup.addNewCheckpoint(newCheckpoint); log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); } } @@ -785,10 +801,13 @@ void resetInternal(DataSourceMetadata dataSourceMetadata) : currentMetadata.getKafkaPartitions() .getPartitionOffsetMap() .get(resetPartitionOffset.getKey()); - final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey())); - if (partitionOffsetInMetadataStore != null || - (partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey()) - .equals(resetPartitionOffset.getValue()))) { + final TaskGroup partitionTaskGroup = taskGroups.get( + getTaskGroupIdForPartition(resetPartitionOffset.getKey()) + ); + final boolean isSameOffset = partitionTaskGroup != null + && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey()) + .equals(resetPartitionOffset.getValue()); + if (partitionOffsetInMetadataStore != null || isSameOffset) { doReset = true; break; } @@ -1012,7 +1031,7 @@ private void discoverTasks() throws ExecutionException, InterruptedException, Ti List futureTaskIds = Lists.newArrayList(); List> futures = Lists.newArrayList(); List tasks = taskStorage.getActiveTasks(); - final Set taskGroupsToVerify = new HashSet<>(); + final Map taskGroupsToVerify = new HashMap<>(); for (Task task : tasks) { if (!(task instanceof KafkaIndexTask) || !dataSource.equals(task.getDataSource())) { @@ -1119,6 +1138,7 @@ public Boolean apply(KafkaIndexTask.Status status) k -> { log.info("Creating a new task group for taskGroupId[%d]", taskGroupId); return new TaskGroup( + taskGroupId, ImmutableMap.copyOf( kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap() ), @@ -1127,8 +1147,15 @@ public Boolean apply(KafkaIndexTask.Status status) ); } ); - taskGroupsToVerify.add(taskGroupId); - taskGroup.tasks.putIfAbsent(taskId, new TaskData()); + taskGroupsToVerify.put(taskGroupId, taskGroup); + final TaskData prevTaskGroup = taskGroup.tasks.putIfAbsent(taskId, new TaskData()); + if (prevTaskGroup != null) { + throw new ISE( + "WTH? a taskGroup[%s] already exists for new task[%s]", + prevTaskGroup, + taskId + ); + } } } return true; @@ -1156,7 +1183,7 @@ public Boolean apply(KafkaIndexTask.Status status) log.debug("Found [%d] Kafka indexing tasks for dataSource [%s]", taskCount, dataSource); // make sure the checkpoints are consistent with each other and with the metadata store - taskGroupsToVerify.forEach(this::verifyAndMergeCheckpoints); + taskGroupsToVerify.values().forEach(this::verifyAndMergeCheckpoints); } /** @@ -1166,10 +1193,9 @@ public Boolean apply(KafkaIndexTask.Status status) * 2. truncates the checkpoints in the taskGroup corresponding to which segments have been published, so that any newly * created tasks for the taskGroup start indexing from after the latest published offsets. */ - private void verifyAndMergeCheckpoints(final Integer groupId) + private void verifyAndMergeCheckpoints(final TaskGroup taskGroup) { - final TaskGroup taskGroup = taskGroups.get(groupId); - + final int groupId = taskGroup.groupId; // List {SequenceId, Checkpoints}> final List>>> taskSequences = new CopyOnWriteArrayList<>(); final List>>> futures = new ArrayList<>(); @@ -1330,6 +1356,7 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups( // reading the minimumMessageTime & maximumMessageTime from the publishing task and setting it here is not necessary as this task cannot // change to a state where it will read any more events TaskGroup newTaskGroup = new TaskGroup( + groupId, ImmutableMap.copyOf(startingPartitions), Optional.absent(), Optional.absent() @@ -1367,8 +1394,8 @@ public Boolean apply(@Nullable DateTime startTime) } taskData.startTime = startTime; - long millisRemaining = ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - - taskData.startTime.getMillis()); + long millisRemaining = ioConfig.getTaskDuration().getMillis() - + (System.currentTimeMillis() - taskData.startTime.getMillis()); if (millisRemaining > 0) { scheduledExec.schedule( buildRunTask(), @@ -1421,7 +1448,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException // find the longest running task from this group DateTime earliestTaskStart = DateTimes.nowUtc(); for (TaskData taskData : group.tasks.values()) { - if (earliestTaskStart.isAfter(taskData.startTime)) { + // startTime can be null if kafkaSupervisor is stopped gracefully before processing any runNotice + if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) { earliestTaskStart = taskData.startTime; } } @@ -1430,7 +1458,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); - futures.add(checkpointTaskGroup(groupId, true)); + futures.add(checkpointTaskGroup(group, true)); } } @@ -1468,10 +1496,8 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException } } - private ListenableFuture> checkpointTaskGroup(final int groupId, final boolean finalize) + private ListenableFuture> checkpointTaskGroup(final TaskGroup taskGroup, final boolean finalize) { - final TaskGroup taskGroup = taskGroups.get(groupId); - if (finalize) { // 1) Check if any task completed (in which case we're done) and kill unassigned tasks Iterator> i = taskGroup.tasks.entrySet().iterator(); @@ -1480,31 +1506,38 @@ private ListenableFuture> checkpointTaskGroup(final int group String taskId = taskEntry.getKey(); TaskData task = taskEntry.getValue(); - if (task.status.isSuccess()) { - // If any task in this group has already completed, stop the rest of the tasks in the group and return. - // This will cause us to create a new set of tasks next cycle that will start from the offsets in - // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing - // failed and we need to re-ingest) - return Futures.transform( - stopTasksInGroup(taskGroup), - new Function>() - { - @Nullable - @Override - public Map apply(@Nullable Object input) + // task.status can be null if any runNotice is processed before kafkaSupervisor is stopped gracefully. + if (task.status != null) { + if (task.status.isSuccess()) { + // If any task in this group has already completed, stop the rest of the tasks in the group and return. + // This will cause us to create a new set of tasks next cycle that will start from the offsets in + // metadata store (which will have advanced if we succeeded in publishing and will remain the same if + // publishing failed and we need to re-ingest) + return Futures.transform( + stopTasksInGroup(taskGroup), + new Function>() { - return null; + @Nullable + @Override + public Map apply(@Nullable Object input) + { + return null; + } } - } - ); - } + ); + } - if (task.status.isRunnable()) { - if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) { - log.info("Killing task [%s] which hasn't been assigned to a worker", taskId); - killTask(taskId); - i.remove(); + if (task.status.isRunnable()) { + if (taskInfoProvider.getTaskLocation(taskId).equals(TaskLocation.unknown())) { + log.info("Killing task [%s] which hasn't been assigned to a worker", taskId); + killTask(taskId); + i.remove(); + } } + } else { + log.info("Killing task [%s] of unknown status", taskId); + killTask(taskId); + i.remove(); } } } @@ -1550,7 +1583,7 @@ public Map apply(List> input) final List setEndOffsetTaskIds = ImmutableList.copyOf(taskGroup.taskIds()); if (setEndOffsetTaskIds.isEmpty()) { - log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId); + log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId); return null; } @@ -1561,11 +1594,15 @@ public Map apply(List> input) "Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]", endOffsets, taskGroup.sequenceOffsets.lastEntry().getValue(), - groupId + taskGroup.groupId ); } - log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets); + log.info( + "Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", + taskGroup.groupId, + endOffsets + ); for (final String taskId : setEndOffsetTaskIds) { setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize)); } @@ -1587,7 +1624,7 @@ public Map apply(List> input) } if (taskGroup.tasks.isEmpty()) { - log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", groupId); + log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", taskGroup.groupId); return null; } @@ -1627,11 +1664,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte continue; } - Iterator> iTask = group.tasks.entrySet().iterator(); + Iterator> iTask = group.tasks.entrySet().iterator(); while (iTask.hasNext()) { - Map.Entry task = iTask.next(); + final Entry entry = iTask.next(); + final String taskId = entry.getKey(); + final TaskData taskData = entry.getValue(); + + Preconditions.checkNotNull(taskData.status, "task[%s] has a null status", taskId); - if (task.getValue().status.isFailure()) { + if (taskData.status.isFailure()) { iTask.remove(); // remove failed task if (group.tasks.isEmpty()) { // if all tasks in the group have failed, just nuke all task groups with this partition set and restart @@ -1640,10 +1681,10 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte } } - if (task.getValue().status.isSuccess()) { + if (taskData.status.isSuccess()) { // If one of the pending completion tasks was successful, stop the rest of the tasks in the group as // we no longer need them to publish their segment. - log.info("Task [%s] completed successfully, stopping tasks %s", task.getKey(), group.taskIds()); + log.info("Task [%s] completed successfully, stopping tasks %s", taskId, group.taskIds()); futures.add(stopTasksInGroup(group)); foundSuccess = true; toRemove.add(group); // remove the TaskGroup from the list of pending completion task groups @@ -1714,6 +1755,8 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep continue; } + Preconditions.checkNotNull(taskData.status, "task[%s] has a null status", taskId); + // remove failed tasks if (taskData.status.isFailure()) { iTasks.remove(); @@ -1741,7 +1784,7 @@ void createNewTasks() throws JsonProcessingException taskGroups.entrySet() .stream() .filter(taskGroup -> taskGroup.getValue().tasks.size() < ioConfig.getReplicas()) - .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getKey())); + .forEach(taskGroup -> verifyAndMergeCheckpoints(taskGroup.getValue())); // check that there is a current task group for each group of partitions in [partitionGroups] for (Integer groupId : partitionGroups.keySet()) { @@ -1757,6 +1800,7 @@ void createNewTasks() throws JsonProcessingException ) : Optional.absent()); final TaskGroup taskGroup = new TaskGroup( + groupId, generateStartingOffsetsForPartitionGroup(groupId), minimumMessageTime, maximumMessageTime @@ -1984,8 +2028,12 @@ private ListenableFuture stopTasksInGroup(@Nullable TaskGroup taskGroup) final List> futures = Lists.newArrayList(); for (Map.Entry entry : taskGroup.tasks.entrySet()) { - if (!entry.getValue().status.isComplete()) { - futures.add(stopTask(entry.getKey(), false)); + final String taskId = entry.getKey(); + final TaskData taskData = entry.getValue(); + if (taskData.status == null) { + killTask(taskId); + } else if (!taskData.status.isComplete()) { + futures.add(stopTask(taskId, false)); } } @@ -2066,7 +2114,7 @@ private SupervisorReport generateReport(boolean in for (TaskGroup taskGroup : taskGroups.values()) { for (Map.Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); - DateTime startTime = entry.getValue().startTime; + @Nullable DateTime startTime = entry.getValue().startTime; Map currentOffsets = entry.getValue().currentOffsets; Long remainingSeconds = null; if (startTime != null) { @@ -2093,7 +2141,7 @@ private SupervisorReport generateReport(boolean in for (TaskGroup taskGroup : taskGroups) { for (Map.Entry entry : taskGroup.tasks.entrySet()) { String taskId = entry.getKey(); - DateTime startTime = entry.getValue().startTime; + @Nullable DateTime startTime = entry.getValue().startTime; Map currentOffsets = entry.getValue().currentOffsets; Long remainingSeconds = null; if (taskGroup.completionTimeout != null) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java index b10304247926..6abc3b02f6a3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/TaskReportData.java @@ -45,7 +45,7 @@ public TaskReportData( String id, @Nullable Map startingOffsets, @Nullable Map currentOffsets, - DateTime startTime, + @Nullable DateTime startTime, Long remainingSeconds, TaskType type, @Nullable Map lag From d2cfbd309e6a249918ff7aecaafa887bf2821f52 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 22 Aug 2018 20:31:48 -0700 Subject: [PATCH 2/3] address comments --- .../io/druid/indexing/kafka/supervisor/KafkaSupervisor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index ac7ba795e689..69746e00fff3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1506,7 +1506,7 @@ private ListenableFuture> checkpointTaskGroup(final TaskGroup String taskId = taskEntry.getKey(); TaskData task = taskEntry.getValue(); - // task.status can be null if any runNotice is processed before kafkaSupervisor is stopped gracefully. + // task.status can be null if kafkaSupervisor is stopped gracefully before processing any runNotice. if (task.status != null) { if (task.status.isSuccess()) { // If any task in this group has already completed, stop the rest of the tasks in the group and return. @@ -1670,7 +1670,7 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte final String taskId = entry.getKey(); final TaskData taskData = entry.getValue(); - Preconditions.checkNotNull(taskData.status, "task[%s] has a null status", taskId); + Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId); if (taskData.status.isFailure()) { iTask.remove(); // remove failed task @@ -1755,7 +1755,7 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep continue; } - Preconditions.checkNotNull(taskData.status, "task[%s] has a null status", taskId); + Preconditions.checkNotNull(taskData.status, "WTH? task[%s] has a null status", taskId); // remove failed tasks if (taskData.status.isFailure()) { From dc22664a1ad15fd7fc25c10bb55541b45ad9894e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 25 Aug 2018 23:00:20 -0700 Subject: [PATCH 3/3] address comment --- .../io/druid/indexing/kafka/supervisor/KafkaSupervisor.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 69746e00fff3..8e12f591461a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1534,10 +1534,6 @@ public Map apply(@Nullable Object input) i.remove(); } } - } else { - log.info("Killing task [%s] of unknown status", taskId); - killTask(taskId); - i.remove(); } } }