From 988b6fcc669ad1ab78a700272ad2462e57a46b83 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 22 Jun 2018 14:16:51 -0700 Subject: [PATCH 1/2] Fix Kafka Indexing task pause forever (#5656) * Fix Nullpointer Exception in overlord if taskGroups does not contain the groupId * If the endOffset is same as startOffset, still let the task resume instead of returning endOffsets early which causes the tasks to pause forever and ultimately fail on timeout --- .../kafka/supervisor/KafkaSupervisor.java | 7 +- .../kafka/supervisor/KafkaSupervisorTest.java | 81 +++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 3ef27a85d6c4..dea8ec4fe1b5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -888,6 +888,10 @@ String generateSequenceName( @VisibleForTesting String generateSequenceName(int groupId) { + if (taskGroups.get(groupId) == null) { + log.warn("taskGroups does not contain groupId %s, returning null sequenceName", groupId); + return null; + } return generateSequenceName( taskGroups.get(groupId).partitionOffsets, taskGroups.get(groupId).minimumMessageTime, @@ -1524,12 +1528,11 @@ public Map apply(List> input) if (endOffsets.equals(taskGroup.sequenceOffsets.lastEntry().getValue())) { log.warn( - "Not adding checkpoint [%s] as its same as the start offsets [%s] of latest sequence for the task group [%d]", + "Checkpoint [%s] is same as the start offsets [%s] of latest sequence for the task group [%d]", endOffsets, taskGroup.sequenceOffsets.lastEntry().getValue(), groupId ); - return endOffsets; } log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 04861d0c6df3..d823b60b132e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -1905,6 +1905,87 @@ public void testResetRunningTasks() throws Exception verifyAll(); } + @Test + public void testNoDataIngestionTasks() throws Exception + { + final DateTime startTime = DateTimes.nowUtc(); + supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + //not adding any events + Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + "sequenceName-0", + new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + "sequenceName-0", + new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + "sequenceName-0", + new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); + + TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + reset(taskQueue, indexerMetadataStorageCoordinator); + expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); + taskQueue.shutdown("id1"); + taskQueue.shutdown("id2"); + taskQueue.shutdown("id3"); + replay(taskQueue, indexerMetadataStorageCoordinator); + + supervisor.resetInternal(null); + verifyAll(); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { From db0b96caba6af406a8ccc5bd8f5175c93e895ad0 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 22 Jun 2018 22:54:21 -0700 Subject: [PATCH 2/2] Address PR comment *Remove the null check and do not return null from generateSequenceName --- .../io/druid/indexing/kafka/supervisor/KafkaSupervisor.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index dea8ec4fe1b5..029a0332c6f1 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -43,8 +43,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.indexer.TaskLocation; -import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; @@ -888,10 +888,6 @@ String generateSequenceName( @VisibleForTesting String generateSequenceName(int groupId) { - if (taskGroups.get(groupId) == null) { - log.warn("taskGroups does not contain groupId %s, returning null sequenceName", groupId); - return null; - } return generateSequenceName( taskGroups.get(groupId).partitionOffsets, taskGroups.get(groupId).minimumMessageTime,