From 2b59c9cb03e411c3f8a1cb541cd9a3507253210b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 10 Jul 2018 19:10:34 -0700 Subject: [PATCH 1/6] Fix NPE while handling CheckpointNotice --- .../MaterializedViewSupervisor.java | 7 +- ...ementalPublishingKafkaIndexTaskRunner.java | 7 +- .../druid/indexing/kafka/KafkaIOConfig.java | 9 + .../kafka/supervisor/KafkaSupervisor.java | 179 +++++---- .../indexing/kafka/KafkaIndexTaskTest.java | 106 ++++-- .../kafka/supervisor/KafkaSupervisorTest.java | 352 ++++++++++++++---- .../CheckPointDataSourceMetadataAction.java | 27 +- .../supervisor/SupervisorManager.java | 8 +- .../java/util/emitter/EmittingLogger.java | 4 + .../supervisor/NoopSupervisorSpec.java | 6 +- .../overlord/supervisor/Supervisor.java | 9 +- .../ExceptionCapturingServiceEmitter.java | 70 ++++ 12 files changed, 568 insertions(+), 216 deletions(-) create mode 100644 server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java index 215237d62dba..b1767e7c607c 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -53,7 +53,6 @@ import org.joda.time.Duration; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -240,11 +239,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint - ) + public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) { // do nothing } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index dc0c4e3e0826..72ceefb944eb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -600,12 +600,13 @@ public void onFailure(Throwable t) sequences ); requestPause(); - if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction( + final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( task.getDataSource(), - ioConfig.getBaseSequenceName(), + ioConfig.getTaskGroupId(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) - ))) { + ); + if (!toolbox.getTaskActionClient().submit(checkpointAction)) { throw new ISE("Checkpoint request with offsets [%s] failed, dying", nextOffsets); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index 5d48fe19405b..6d8c0a19731c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -33,6 +33,7 @@ public class KafkaIOConfig implements IOConfig private static final boolean DEFAULT_USE_TRANSACTION = true; private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false; + private final int taskGroupId; private final String baseSequenceName; private final KafkaPartitions startPartitions; private final KafkaPartitions endPartitions; @@ -44,6 +45,7 @@ public class KafkaIOConfig implements IOConfig @JsonCreator public KafkaIOConfig( + @JsonProperty("taskGroupId") Integer taskGroupId, @JsonProperty("baseSequenceName") String baseSequenceName, @JsonProperty("startPartitions") KafkaPartitions startPartitions, @JsonProperty("endPartitions") KafkaPartitions endPartitions, @@ -54,6 +56,7 @@ public KafkaIOConfig( @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps ) { + this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions"); this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); @@ -83,6 +86,12 @@ public KafkaIOConfig( } } + @JsonProperty + public int getTaskGroupId() + { + return taskGroupId; + } + @JsonProperty public String getBaseSequenceName() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 39b19ee73719..1fec418fc8dc 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -159,7 +159,7 @@ static class TaskGroup DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action final TreeMap> sequenceOffsets = new TreeMap<>(); - public TaskGroup( + TaskGroup( ImmutableMap partitionOffsets, Optional minimumMessageTime, Optional maximumMessageTime @@ -171,7 +171,7 @@ public TaskGroup( this.sequenceOffsets.put(0, partitionOffsets); } - public int addNewCheckpoint(Map checkpoint) + int addNewCheckpoint(Map checkpoint) { sequenceOffsets.put(sequenceOffsets.lastKey() + 1, checkpoint); return sequenceOffsets.lastKey(); @@ -212,9 +212,6 @@ private static class TaskData private final ConcurrentHashMap> partitionGroups = new ConcurrentHashMap<>(); // -------------------------------------------------------- - // BaseSequenceName -> TaskGroup - private final ConcurrentHashMap sequenceTaskGroup = new ConcurrentHashMap<>(); - private final TaskStorage taskStorage; private final TaskMaster taskMaster; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; @@ -513,13 +510,9 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint( - String sequenceName, - DataSourceMetadata previousCheckpoint, - DataSourceMetadata currentCheckpoint - ) + public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint) { - Preconditions.checkNotNull(sequenceName, "Cannot checkpoint without a sequence name"); + Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint"); Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null"); Preconditions.checkArgument( ioConfig.getTopic() @@ -530,12 +523,14 @@ public void checkpoint( ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic() ); - log.info("Checkpointing [%s] for sequence [%s]", currentCheckpoint, sequenceName); - notices.add(new CheckpointNotice( - sequenceName, - (KafkaDataSourceMetadata) previousCheckpoint, - (KafkaDataSourceMetadata) currentCheckpoint - )); + log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId); + notices.add( + new CheckpointNotice( + taskGroupId, + (KafkaDataSourceMetadata) previousCheckpoint, + (KafkaDataSourceMetadata) currentCheckpoint + ) + ); } public void possiblyRegisterListener() @@ -637,17 +632,17 @@ public void handle() private class CheckpointNotice implements Notice { - final String sequenceName; + final int taskGroupId; final KafkaDataSourceMetadata previousCheckpoint; final KafkaDataSourceMetadata currentCheckpoint; CheckpointNotice( - String sequenceName, + int taskGroupId, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint ) { - this.sequenceName = sequenceName; + this.taskGroupId = taskGroupId; this.previousCheckpoint = previousCheckpoint; this.currentCheckpoint = currentCheckpoint; } @@ -658,17 +653,12 @@ public void handle() throws ExecutionException, InterruptedException // check for consistency // if already received request for this sequenceName and dataSourceMetadata combination then return - Preconditions.checkNotNull( - sequenceTaskGroup.get(sequenceName), - "WTH?! cannot find task group for this sequence [%s], sequencesTaskGroup map [%s], taskGroups [%s]", - sequenceName, - sequenceTaskGroup, - taskGroups - ); - final TreeMap> checkpoints = sequenceTaskGroup.get(sequenceName).sequenceOffsets; + final TaskGroup taskGroup = taskGroups.get(taskGroupId); + + if (isValidTaskGroup(taskGroup)) { + final TreeMap> checkpoints = taskGroup.sequenceOffsets; - // check validity of previousCheckpoint if it is not null - if (previousCheckpoint != null) { + // check validity of previousCheckpoint int index = checkpoints.size(); for (int sequenceId : checkpoints.descendingKeySet()) { Map checkpoint = checkpoints.get(sequenceId); @@ -685,26 +675,39 @@ public void handle() throws ExecutionException, InterruptedException log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); return; } - } else { - // There cannot be more than one checkpoint when previous checkpoint is null - // as when the task starts they are sent existing checkpoints - Preconditions.checkState( - checkpoints.size() <= 1, - "Got checkpoint request with null as previous check point, however found more than one checkpoints" + final int taskGroupId = getTaskGroupIdForPartition( + currentCheckpoint.getKafkaPartitions() + .getPartitionOffsetMap() + .keySet() + .iterator() + .next() ); - if (checkpoints.size() == 1) { - log.info("Already checkpointed with dataSourceMetadata [%s]", checkpoints.get(0)); - return; + final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); + taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); + log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); + } + } + + private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup) + { + if (taskGroup == null) { + // taskGroup might be in pendingCompletionTaskGroups or partitionGroups + if (pendingCompletionTaskGroups.containsKey(taskGroupId)) { + log.warn( + "Ignoring checkpoint request because taskGroup[%d] has already stopped indexing and is waiting for " + + "publishing segments", + taskGroupId + ); + return false; + } else if (partitionGroups.containsKey(taskGroupId)) { + log.warn("Ignoring checkpoint request because taskGroup[%d] is inactive", taskGroupId); + return false; + } else { + throw new ISE("WTH?! cannot find taskGroup [%s] among all taskGroups [%s]", taskGroupId, taskGroups); } } - final int taskGroupId = getTaskGroupIdForPartition(currentCheckpoint.getKafkaPartitions() - .getPartitionOffsetMap() - .keySet() - .iterator() - .next()); - final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); - sequenceTaskGroup.get(sequenceName).addNewCheckpoint(newCheckpoint); - log.info("Handled checkpoint notice, new checkpoint is [%s] for sequence [%s]", newCheckpoint, sequenceName); + + return true; } } @@ -718,7 +721,6 @@ void resetInternal(DataSourceMetadata dataSourceMetadata) taskGroups.values().forEach(this::killTasksInGroup); taskGroups.clear(); partitionGroups.clear(); - sequenceTaskGroup.clear(); } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) { throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass()); } else { @@ -778,8 +780,7 @@ void resetInternal(DataSourceMetadata dataSourceMetadata) resetKafkaMetadata.getKafkaPartitions().getPartitionOffsetMap().keySet().forEach(partition -> { final int groupId = getTaskGroupIdForPartition(partition); killTaskGroupForPartitions(ImmutableSet.of(partition)); - final TaskGroup removedGroup = taskGroups.remove(groupId); - sequenceTaskGroup.remove(generateSequenceName(removedGroup)); + taskGroups.remove(groupId); partitionGroups.get(groupId).replaceAll((partitionId, offset) -> NOT_SET); }); } else { @@ -955,9 +956,10 @@ private void updatePartitionDataFromKafka() for (int partition = 0; partition < numPartitions; partition++) { int taskGroupId = getTaskGroupIdForPartition(partition); - partitionGroups.putIfAbsent(taskGroupId, new ConcurrentHashMap()); - - ConcurrentHashMap partitionMap = partitionGroups.get(taskGroupId); + ConcurrentHashMap partitionMap = partitionGroups.computeIfAbsent( + taskGroupId, + k -> new ConcurrentHashMap<>() + ); // The starting offset for a new partition in [partitionGroups] is initially set to NOT_SET; when a new task group // is created and is assigned partitions, if the offset in [partitionGroups] is NOT_SET it will take the starting @@ -1087,23 +1089,18 @@ public Boolean apply(KafkaIndexTask.Status status) } return false; } else { - final TaskGroup taskGroup = new TaskGroup( - ImmutableMap.copyOf( - kafkaTask.getIOConfig() - .getStartPartitions() - .getPartitionOffsetMap() - ), kafkaTask.getIOConfig().getMinimumMessageTime(), - kafkaTask.getIOConfig().getMaximumMessageTime() - ); - if (taskGroups.putIfAbsent( + final TaskGroup taskGroup = taskGroups.computeIfAbsent( taskGroupId, - taskGroup - ) == null) { - sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(taskGroupId)); - log.info("Created new task group [%d]", taskGroupId); - } + k -> new TaskGroup( + ImmutableMap.copyOf( + kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap() + ), + kafkaTask.getIOConfig().getMinimumMessageTime(), + kafkaTask.getIOConfig().getMaximumMessageTime() + ) + ); taskGroupsToVerify.add(taskGroupId); - taskGroups.get(taskGroupId).tasks.putIfAbsent(taskId, new TaskData()); + taskGroup.tasks.putIfAbsent(taskId, new TaskData()); } } return true; @@ -1256,7 +1253,6 @@ public void onFailure(Throwable t) // killing all tasks or no task left in the group ? // clear state about the taskgroup so that get latest offset information is fetched from metadata store log.warn("Clearing task group [%d] information as no valid tasks left the group", groupId); - sequenceTaskGroup.remove(generateSequenceName(taskGroup)); taskGroups.remove(groupId); partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); } @@ -1281,9 +1277,10 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups( Map startingPartitions ) { - pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList()); - - CopyOnWriteArrayList taskGroupList = pendingCompletionTaskGroups.get(groupId); + final CopyOnWriteArrayList taskGroupList = pendingCompletionTaskGroups.computeIfAbsent( + groupId, + k -> new CopyOnWriteArrayList<>() + ); for (TaskGroup taskGroup : taskGroupList) { if (taskGroup.partitionOffsets.equals(startingPartitions)) { if (taskGroup.tasks.putIfAbsent(taskId, new TaskData()) == null) { @@ -1411,8 +1408,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException if (endOffsets != null) { // set a timeout and put this group in pendingCompletionTaskGroups so that it can be monitored for completion group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout()); - pendingCompletionTaskGroups.putIfAbsent(groupId, Lists.newCopyOnWriteArrayList()); - pendingCompletionTaskGroups.get(groupId).add(group); + pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group); // set endOffsets as the next startOffsets for (Map.Entry entry : endOffsets.entrySet()) { @@ -1432,7 +1428,6 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); } - sequenceTaskGroup.remove(generateSequenceName(group)); // remove this task group from the list of current task groups now that it has been handled taskGroups.remove(groupId); } @@ -1456,7 +1451,8 @@ private ListenableFuture> checkpointTaskGroup(final int group // metadata store (which will have advanced if we succeeded in publishing and will remain the same if publishing // failed and we need to re-ingest) return Futures.transform( - stopTasksInGroup(taskGroup), new Function>() + stopTasksInGroup(taskGroup), + new Function>() { @Nullable @Override @@ -1625,15 +1621,15 @@ private void checkPendingCompletionTasks() throws ExecutionException, Interrupte log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", groupId); } else { log.makeAlert( - "No task in [%s] succeeded before the completion timeout elapsed [%s]!", + "No task in [%s] for taskGroup [%d] succeeded before the completion timeout elapsed [%s]!", group.taskIds(), + groupId, ioConfig.getCompletionTimeout() ).emit(); } // reset partitions offsets for this task group so that they will be re-read from metadata storage partitionGroups.get(groupId).replaceAll((partition, offset) -> NOT_SET); - sequenceTaskGroup.remove(generateSequenceName(group)); // kill all the tasks in this pending completion group killTasksInGroup(group); // set a flag so the other pending completion groups for this set of partitions will also stop @@ -1693,7 +1689,6 @@ private void checkCurrentTaskState() throws ExecutionException, InterruptedExcep // be recreated with the next set of offsets if (taskData.status.isSuccess()) { futures.add(stopTasksInGroup(taskGroup)); - sequenceTaskGroup.remove(generateSequenceName(taskGroup)); iTaskGroups.remove(); break; } @@ -1735,7 +1730,6 @@ void createNewTasks() throws JsonProcessingException groupId, taskGroup ); - sequenceTaskGroup.put(generateSequenceName(taskGroup), taskGroups.get(groupId)); } } @@ -1778,6 +1772,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc DateTime maximumMessageTime = taskGroups.get(groupId).maximumMessageTime.orNull(); KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( + groupId, sequenceName, new KafkaPartitions(ioConfig.getTopic(), startPartitions), new KafkaPartitions(ioConfig.getTopic(), endPartitions), @@ -1944,7 +1939,7 @@ private boolean isTaskCurrent(int taskGroupId, String taskId) } } - private ListenableFuture stopTasksInGroup(TaskGroup taskGroup) + private ListenableFuture stopTasksInGroup(@Nullable TaskGroup taskGroup) { if (taskGroup == null) { return Futures.immediateFuture(null); @@ -2289,6 +2284,28 @@ private Map> getCurrentTotalStats() throws Interrupt return allStats; } + @VisibleForTesting + @Nullable + TaskGroup removeTaskGroup(int taskGroupId) + { + return taskGroups.remove(taskGroupId); + } + + @VisibleForTesting + void moveTaskGroupToPendingCompletion(int taskGroupId) + { + final TaskGroup taskGroup = taskGroups.remove(taskGroupId); + if (taskGroup != null) { + pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, k -> new CopyOnWriteArrayList<>()).add(taskGroup); + } + } + + @VisibleForTesting + int getNoticesQueueSize() + { + return notices.size(); + } + private static class StatsFromTaskResult { private final String groupId; diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 965651c19eb0..0cb88b05918b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -262,21 +262,21 @@ public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported) private static List> generateRecords(String topic) { return ImmutableList.of( - new ProducerRecord(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, StringUtils.toUtf8("unparseable")), - new ProducerRecord(topic, 0, null, StringUtils.toUtf8("unparseable2")), - new ProducerRecord(topic, 0, null, null), - new ProducerRecord(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")), - new ProducerRecord(topic, 0, null, JB("2049", "f", "y", "10", "notanumber", "1.0")), - new ProducerRecord(topic, 0, null, JB("2049", "f", "y", "10", "20.0", "notanumber")), - new ProducerRecord(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")), - new ProducerRecord(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0")) + new ProducerRecord<>(topic, 0, null, JB("2008", "a", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2009", "b", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2010", "c", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2011", "d", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2011", "e", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")), + new ProducerRecord<>(topic, 0, null, null), + new ProducerRecord<>(topic, 0, null, JB("2013", "f", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "notanumber", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "notanumber", "1.0")), + new ProducerRecord<>(topic, 0, null, JB("2049", "f", "y", "10", "20.0", "notanumber")), + new ProducerRecord<>(topic, 1, null, JB("2012", "g", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 1, null, JB("2011", "h", "y", "10", "20.0", "1.0")) ); } @@ -377,6 +377,7 @@ public void testRunAfterDataInserted() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -418,6 +419,7 @@ public void testRunBeforeDataInserted() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -493,6 +495,7 @@ public void testIncrementalHandOff() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, baseSequenceName, startPartitions, endPartitions, @@ -514,14 +517,16 @@ public void testIncrementalHandOff() throws Exception Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); - Assert.assertTrue(checkpointRequestsHash.contains( - Objects.hash( - DATA_SCHEMA.getDataSource(), - baseSequenceName, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, currentOffsets)) + ) ) - )); + ); // Check metrics Assert.assertEquals(8, task.getRunner().getRowIngestionMeters().getProcessed()); @@ -581,6 +586,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, baseSequenceName, startPartitions, endPartitions, @@ -603,14 +609,16 @@ public void testTimeBasedIncrementalHandOff() throws Exception Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); - Assert.assertTrue(checkpointRequestsHash.contains( - Objects.hash( - DATA_SCHEMA.getDataSource(), - baseSequenceName, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap())) + Assert.assertTrue( + checkpointRequestsHash.contains( + Objects.hash( + DATA_SCHEMA.getDataSource(), + 0, + new KafkaDataSourceMetadata(startPartitions), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoint.getPartitionOffsetMap())) + ) ) - )); + ); // Check metrics Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getProcessed()); @@ -637,6 +645,7 @@ public void testRunWithMinimumMessageTime() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -690,6 +699,7 @@ public void testRunWithMaximumMessageTime() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -753,6 +763,7 @@ public void testRunWithTransformSpec() throws Exception ) ), new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -812,6 +823,7 @@ public void testRunOnNothing() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), @@ -852,6 +864,7 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -903,6 +916,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -957,6 +971,7 @@ public void testReportParseExceptions() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 7L)), @@ -1000,6 +1015,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 13L)), @@ -1081,6 +1097,7 @@ public void testMultipleParseExceptionsFailure() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), @@ -1140,6 +1157,7 @@ public void testRunReplicas() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1153,6 +1171,7 @@ public void testRunReplicas() throws Exception final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1206,6 +1225,7 @@ public void testRunConflicting() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1219,6 +1239,7 @@ public void testRunConflicting() throws Exception final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 1, "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), @@ -1273,6 +1294,7 @@ public void testRunConflictingWithoutTransactions() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1286,6 +1308,7 @@ public void testRunConflictingWithoutTransactions() throws Exception final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 1, "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), @@ -1345,6 +1368,7 @@ public void testRunOneTaskTwoPartitions() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L, 1, 0L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)), @@ -1409,6 +1433,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1422,6 +1447,7 @@ public void testRunTwoTasksTwoPartitions() throws Exception final KafkaIndexTask task2 = createTask( null, new KafkaIOConfig( + 1, "sequence1", new KafkaPartitions(topic, ImmutableMap.of(1, 0L)), new KafkaPartitions(topic, ImmutableMap.of(1, 1L)), @@ -1477,6 +1503,7 @@ public void testRestore() throws Exception final KafkaIndexTask task1 = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1513,6 +1540,7 @@ public void testRestore() throws Exception final KafkaIndexTask task2 = createTask( task1.getId(), new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1564,6 +1592,7 @@ public void testRunWithPauseAndResume() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1647,6 +1676,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), @@ -1685,6 +1715,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", new KafkaPartitions(topic, ImmutableMap.of(0, 200L)), new KafkaPartitions(topic, ImmutableMap.of(0, 500L)), @@ -1737,6 +1768,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception final KafkaIndexTask task = createTask( null, new KafkaIOConfig( + 0, "sequence0", // task should ignore these and use sequence info sent in the context new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), @@ -2026,18 +2058,20 @@ private void makeToolboxFactory() throws IOException @Override public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable String sequenceName, + int taskGroupId, @Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata currentDataSourceMetadata ) { log.info("Adding checkpoint hash to the set"); - checkpointRequestsHash.add(Objects.hash( - supervisorId, - sequenceName, - previousDataSourceMetadata, - currentDataSourceMetadata - )); + checkpointRequestsHash.add( + Objects.hash( + supervisorId, + taskGroupId, + previousDataSourceMetadata, + currentDataSourceMetadata + ) + ); return true; } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 1d951de9d0b1..038560af84b0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -19,6 +19,7 @@ package io.druid.indexing.kafka.supervisor; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; @@ -61,6 +62,7 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.parsers.JSONPathFieldSpec; import io.druid.java.util.common.parsers.JSONPathSpec; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.TestHelper; @@ -70,6 +72,7 @@ import io.druid.segment.realtime.FireDepartment; import io.druid.server.metrics.DruidMonitorSchedulerConfig; import io.druid.server.metrics.NoopServiceEmitter; +import io.druid.server.metrics.ExceptionCapturingServiceEmitter; import org.apache.curator.test.TestingCluster; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -99,7 +102,9 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; import static org.easymock.EasyMock.anyBoolean; import static org.easymock.EasyMock.anyObject; @@ -141,6 +146,7 @@ public class KafkaSupervisorTest extends EasyMockSupport private TaskQueue taskQueue; private String topic; private RowIngestionMetersFactory rowIngestionMetersFactory; + private ExceptionCapturingServiceEmitter serviceEmitter; private static String getTopic() { @@ -213,6 +219,8 @@ public void setupTest() topic = getTopic(); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); + serviceEmitter = new ExceptionCapturingServiceEmitter(); + EmittingLogger.registerEmitter(serviceEmitter); } @After @@ -553,7 +561,7 @@ public void testKillIncompatibleTasks() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "index_kafka_testDS__some_other_sequenceName", + 1, new KafkaPartitions("topic", ImmutableMap.of(0, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, 10L)), null, @@ -564,7 +572,7 @@ public void testKillIncompatibleTasks() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 333L)), null, @@ -575,7 +583,7 @@ public void testKillIncompatibleTasks() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "index_kafka_testDS__some_other_sequenceName", + 1, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 1L)), new KafkaPartitions("topic", ImmutableMap.of(0, 333L, 1, 333L, 2, 330L)), null, @@ -586,7 +594,7 @@ public void testKillIncompatibleTasks() throws Exception Task id4 = createKafkaIndexTask( "id4", "other-datasource", - "index_kafka_testDS_d927edff33c4b3f", + 2, new KafkaPartitions("topic", ImmutableMap.of(0, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, 10L)), null, @@ -634,7 +642,9 @@ public void testKillIncompatibleTasks() throws Exception TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(2); replayAll(); @@ -652,7 +662,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -661,7 +671,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-1", + 1, new KafkaPartitions("topic", ImmutableMap.of(1, 0L)), new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE)), null, @@ -670,7 +680,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -679,7 +689,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id4 = createKafkaIndexTask( "id4", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE)), null, @@ -688,7 +698,7 @@ public void testKillBadPartitionAssignment() throws Exception Task id5 = createKafkaIndexTask( "id5", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -727,8 +737,12 @@ public void testKillBadPartitionAssignment() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); taskQueue.shutdown("id4"); @@ -765,10 +779,12 @@ public void testRequeueTaskWhenFailed() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .anyTimes(); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .anyTimes(); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .anyTimes(); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .anyTimes(); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -830,7 +846,7 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), now, @@ -857,7 +873,9 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 0L, 2, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(2); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -878,9 +896,12 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception reset(taskClient); // for the newly created replica task - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(captured.getValue())).anyTimes(); expect(taskStorage.getStatus(iHaveFailed.getId())).andReturn(Optional.of(TaskStatus.failure(iHaveFailed.getId()))); @@ -953,10 +974,12 @@ public void testQueueNextTasksOnSuccess() throws Exception TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); // there would be 4 tasks, 2 for each task group - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); for (Task task : tasks) { @@ -1063,10 +1086,12 @@ public void testBeginPublishAndQueueNextTasks() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); replay(taskStorage, taskRunner, taskClient, taskQueue); @@ -1100,7 +1125,7 @@ public void testDiscoverExistingPublishingTask() throws Exception Task task = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1192,7 +1217,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() Task task = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1282,7 +1307,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1292,7 +1317,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1330,7 +1355,9 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception // since id1 is publishing, so getCheckpoints wouldn't be called for it TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 1L, 1, 2L, 2, 3L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); replayAll(); @@ -1404,10 +1431,12 @@ public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); for (Task task : tasks) { @@ -1463,10 +1492,12 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); captured = Capture.newInstance(CaptureType.ALL); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); @@ -1540,10 +1571,12 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception checkpoints1.put(0, ImmutableMap.of(0, 0L, 2, 0L)); TreeMap> checkpoints2 = new TreeMap<>(); checkpoints2.put(0, ImmutableMap.of(1, 0L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints1)) - .times(2); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints2)) - .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-0"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints1)) + .times(2); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("sequenceName-1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints2)) + .times(2); captured = Capture.newInstance(CaptureType.ALL); expect(taskStorage.getActiveTasks()).andReturn(tasks).anyTimes(); @@ -1622,7 +1655,7 @@ public void testStopGracefully() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1632,7 +1665,7 @@ public void testStopGracefully() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1642,7 +1675,7 @@ public void testStopGracefully() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1678,8 +1711,12 @@ public void testStopGracefully() throws Exception // getCheckpoints will not be called for id1 as it is in publishing state TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1824,7 +1861,7 @@ public void testResetRunningTasks() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1834,7 +1871,7 @@ public void testResetRunningTasks() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1844,7 +1881,7 @@ public void testResetRunningTasks() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1879,8 +1916,12 @@ public void testResetRunningTasks() throws Exception TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1908,7 +1949,7 @@ public void testNoDataIngestionTasks() throws Exception Task id1 = createKafkaIndexTask( "id1", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1918,7 +1959,7 @@ public void testNoDataIngestionTasks() throws Exception Task id2 = createKafkaIndexTask( "id2", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1928,7 +1969,7 @@ public void testNoDataIngestionTasks() throws Exception Task id3 = createKafkaIndexTask( "id3", DATASOURCE, - "sequenceName-0", + 0, new KafkaPartitions("topic", ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), new KafkaPartitions("topic", ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), null, @@ -1958,9 +1999,15 @@ public void testNoDataIngestionTasks() throws Exception TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); - expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())).andReturn(Futures.immediateFuture(checkpoints)).times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); replayAll(); @@ -1980,6 +2027,180 @@ public void testNoDataIngestionTasks() throws Exception verifyAll(); } + @Test(timeout = 60_000L) + public void testCheckpointForInactiveTaskGroup() + throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException + { + supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + expect(taskClient.getStatusAsync("id1")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id2")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + expect(taskClient.getStatusAsync("id3")).andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)); + + final DateTime startTime = DateTimes.nowUtc(); + expect(taskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)); + expect(taskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)); + + final TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id1"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id2"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + expect(taskClient.getCheckpointsAsync(EasyMock.contains("id3"), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(1); + + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + + final Map fakeCheckpoints = Collections.emptyMap(); + supervisor.moveTaskGroupToPendingCompletion(0); + supervisor.checkpoint( + 0, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints)) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + + Assert.assertNull(serviceEmitter.getStackTrace()); + Assert.assertNull(serviceEmitter.getExceptionMessage()); + Assert.assertNull(serviceEmitter.getExceptionClass()); + } + + @Test(timeout = 60_000L) + public void testCheckpointForUnknownTaskGroup() throws InterruptedException + { + supervisor = getSupervisor(2, 1, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L, 1, 0L, 2, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE, 1, Long.MAX_VALUE, 2, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + + replayAll(); + + supervisor.start(); + + supervisor.checkpoint( + 0, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + + final String expectedStackTrace = "io.druid.java.util.common.ISE: WTH?! cannot find taskGroup [0] among all taskGroups [{}]\n" + + "\tat io.druid.indexing.kafka.supervisor.KafkaSupervisor$CheckpointNotice.isValidTaskGroup(KafkaSupervisor.java:706)\n" + + "\tat io.druid.indexing.kafka.supervisor.KafkaSupervisor$CheckpointNotice.handle(KafkaSupervisor.java:658)\n" + + "\tat io.druid.indexing.kafka.supervisor.KafkaSupervisor$2.run(KafkaSupervisor.java:367)\n" + + "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n" + + "\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n" + + "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n" + + "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n" + + "\tat java.lang.Thread.run(Thread.java:748)\n" + + ""; + final String expectedExceptionMessage = "WTH?! cannot find taskGroup [0] among all taskGroups [{}]"; + Assert.assertEquals(expectedStackTrace, serviceEmitter.getStackTrace()); + Assert.assertEquals(expectedExceptionMessage, serviceEmitter.getExceptionMessage()); + Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { @@ -2106,7 +2327,7 @@ private static DataSchema getDataSchema(String dataSource) private KafkaIndexTask createKafkaIndexTask( String id, String dataSource, - String sequenceName, + int taskGroupId, KafkaPartitions startPartitions, KafkaPartitions endPartitions, DateTime minimumMessageTime, @@ -2119,7 +2340,8 @@ private KafkaIndexTask createKafkaIndexTask( getDataSchema(dataSource), tuningConfig, new KafkaIOConfig( - sequenceName, + taskGroupId, + "sequenceName-" + taskGroupId, startPartitions, endPartitions, ImmutableMap.of(), @@ -2128,7 +2350,7 @@ private KafkaIndexTask createKafkaIndexTask( maximumMessageTime, false ), - ImmutableMap.of(), + Collections.emptyMap(), null, null, rowIngestionMetersFactory diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index 965d62bf6282..8532fb2ebf3b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -21,27 +21,28 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Preconditions; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.DataSourceMetadata; public class CheckPointDataSourceMetadataAction implements TaskAction { private final String supervisorId; - private final String sequenceName; + private final int taskGroupId; private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata currentCheckPoint; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, - @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("taskGroupId") Integer taskGroupId, @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint ) { - this.supervisorId = supervisorId; - this.sequenceName = sequenceName; - this.previousCheckPoint = previousCheckPoint; - this.currentCheckPoint = currentCheckPoint; + this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); + this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); + this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); + this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); } @JsonProperty @@ -51,9 +52,9 @@ public String getSupervisorId() } @JsonProperty - public String getSequenceName() + public int getTaskGroupId() { - return sequenceName; + return taskGroupId; } @JsonProperty @@ -81,8 +82,12 @@ public Boolean perform( Task task, TaskActionToolbox toolbox ) { - return toolbox.getSupervisorManager() - .checkPointDataSourceMetadata(supervisorId, sequenceName, previousCheckPoint, currentCheckPoint); + return toolbox.getSupervisorManager().checkPointDataSourceMetadata( + supervisorId, + taskGroupId, + previousCheckPoint, + currentCheckPoint + ); } @Override @@ -96,7 +101,7 @@ public String toString() { return "CheckPointDataSourceMetadataAction{" + "supervisorId='" + supervisorId + '\'' + - ", sequenceName='" + sequenceName + '\'' + + ", taskGroupId='" + taskGroupId + '\'' + ", previousCheckPoint=" + previousCheckPoint + ", currentCheckPoint=" + currentCheckPoint + '}'; diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index bc0dd7b0d290..a739aa42bdc1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -165,9 +165,9 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousDataSourceMetadata, - @Nullable DataSourceMetadata currentDataSourceMetadata + int taskGroupId, + DataSourceMetadata previousDataSourceMetadata, + DataSourceMetadata currentDataSourceMetadata ) { try { @@ -178,7 +178,7 @@ public boolean checkPointDataSourceMetadata( Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(sequenceName, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); return true; } catch (Exception e) { diff --git a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java index 81f4301b8c4c..4b2234466153 100644 --- a/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java +++ b/java-util/src/main/java/io/druid/java/util/emitter/EmittingLogger.java @@ -35,6 +35,10 @@ */ public class EmittingLogger extends Logger { + public static final String EXCEPTION_TYPE_KEY = "exceptionType"; + public static final String EXCEPTION_MESSAGE_KEY = "exceptionMessage"; + public static final String EXCEPTION_STACK_TRACE_KEY = "exceptionStackTrace"; + private static volatile ServiceEmitter emitter = null; private final String className; diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 380861d9fb8f..0ba0701e82d0 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -83,9 +83,9 @@ public void reset(DataSourceMetadata dataSourceMetadata) {} @Override public void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint + int taskGroupId, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint ) { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index d5efd8e7444a..c43d8e6c999c 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableMap; import io.druid.indexing.overlord.DataSourceMetadata; -import javax.annotation.Nullable; import java.util.Map; public interface Supervisor @@ -52,13 +51,9 @@ default Map> getStats() * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data * represented by {@param currentCheckpoint} DataSourceMetadata * - * @param sequenceName unique Identifier to figure out for which sequence to do checkpointing + * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing * @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param currentCheckPoint current DataSourceMetadata to be checkpointed */ - void checkpoint( - @Nullable String sequenceName, - @Nullable DataSourceMetadata previousCheckPoint, - @Nullable DataSourceMetadata currentCheckPoint - ); + void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint); } diff --git a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java new file mode 100644 index 000000000000..9575b063eb77 --- /dev/null +++ b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java @@ -0,0 +1,70 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.server.metrics; + +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.core.Event; +import io.druid.java.util.emitter.service.ServiceEmitter; + +import javax.annotation.Nullable; +import java.util.Map; + +public class ExceptionCapturingServiceEmitter extends ServiceEmitter +{ + private volatile Class exceptionClass; + private volatile String exceptionMessage; + private volatile String stackTrace; + + public ExceptionCapturingServiceEmitter() + { + super("", "", null); + } + + @Override + public void emit(Event event) { + //noinspection unchecked + final Map dataMap = (Map) event.toMap().get("data"); + final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY); + if (exceptionClass != null) { + final String exceptionMessage = (String) dataMap.get(EmittingLogger.EXCEPTION_MESSAGE_KEY); + final String stackTrace = (String) dataMap.get(EmittingLogger.EXCEPTION_STACK_TRACE_KEY); + this.exceptionClass = exceptionClass; + this.exceptionMessage = exceptionMessage; + this.stackTrace = stackTrace; + } + } + + @Nullable + public Class getExceptionClass() + { + return exceptionClass; + } + + @Nullable + public String getExceptionMessage() + { + return exceptionMessage; + } + + @Nullable + public String getStackTrace() + { + return stackTrace; + } +} From e8f5ab5695b212a3524521124ecb3bacbe02d2cf Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 11 Jul 2018 17:37:17 -0700 Subject: [PATCH 2/6] fix code style --- .../metrics/ExceptionCapturingServiceEmitter.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java index 9575b063eb77..cc217c6f5b05 100644 --- a/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java +++ b/server/src/test/java/io/druid/server/metrics/ExceptionCapturingServiceEmitter.java @@ -1,18 +1,18 @@ /* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file + * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the + * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ @@ -37,7 +37,8 @@ public ExceptionCapturingServiceEmitter() } @Override - public void emit(Event event) { + public void emit(Event event) + { //noinspection unchecked final Map dataMap = (Map) event.toMap().get("data"); final Class exceptionClass = (Class) dataMap.get(EmittingLogger.EXCEPTION_TYPE_KEY); From ba247774508fdb04de6c03a916d4b2fe59394f1b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 11 Jul 2018 18:41:44 -0700 Subject: [PATCH 3/6] Fix test --- .../kafka/supervisor/KafkaSupervisor.java | 3 +- .../indexing/kafka/KafkaIOConfigTest.java | 32 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 8150b67d0223..9e9c5916d8e8 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -143,8 +143,7 @@ public class KafkaSupervisor implements Supervisor * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups] * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]). */ - @VisibleForTesting - static class TaskGroup + private static class TaskGroup { // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java index 3bc55e277cb7..94c9fedbb6ff 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java @@ -32,6 +32,8 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.io.IOException; + public class KafkaIOConfigTest { private final ObjectMapper mapper; @@ -50,6 +52,7 @@ public void testSerdeWithDefaults() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -82,6 +85,7 @@ public void testSerdeWithNonDefaults() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -118,6 +122,7 @@ public void testBaseSequenceNameRequired() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" @@ -137,6 +142,7 @@ public void testStartPartitionsRequired() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" @@ -156,6 +162,7 @@ public void testEndPartitionsRequired() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" @@ -175,6 +182,7 @@ public void testConsumerPropertiesRequired() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -194,6 +202,7 @@ public void testStartAndEndTopicMatch() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" @@ -214,6 +223,7 @@ public void testStartAndEndPartitionSetMatch() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n" @@ -234,6 +244,7 @@ public void testEndOffsetGreaterThanStart() throws Exception { String jsonStr = "{\n" + " \"type\": \"kafka\",\n" + + " \"taskGroupId\": 0,\n" + " \"baseSequenceName\": \"my-sequence-name\",\n" + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n" @@ -248,4 +259,25 @@ public void testEndOffsetGreaterThanStart() throws Exception exception.expectMessage(CoreMatchers.containsString("end offset must be >= start offset")); mapper.readValue(jsonStr, IOConfig.class); } + + @Test + public void testSerdeWithoutGroupId() throws IOException + { + final String jsonStr = + "{\n" + + " \"type\": \"kafka\",\n" + + " \"baseSequenceName\": \"my-sequence-name\",\n" + + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n" + + "}"; + + exception.expect(CoreMatchers.instanceOf(JsonMappingException.class)); + exception.expectCause(CoreMatchers.instanceOf(NullPointerException.class)); + exception.expectMessage("taskGroupId"); + mapper.readValue( + mapper.writeValueAsString(mapper.readValue(jsonStr, IOConfig.class)), + IOConfig.class + ); + } } From da2b22ca41d338e8d0eb462a2febad963d3d6e35 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 11 Jul 2018 20:56:20 -0700 Subject: [PATCH 4/6] fix test --- .../kafka/supervisor/KafkaSupervisorTest.java | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index c055277d30f6..f0f6033eea38 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2185,19 +2185,11 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException verifyAll(); - final String expectedStackTrace = "io.druid.java.util.common.ISE: WTH?! cannot find taskGroup [0] among all taskGroups [{}]\n" - + "\tat io.druid.indexing.kafka.supervisor.KafkaSupervisor$CheckpointNotice.isValidTaskGroup(KafkaSupervisor.java:706)\n" - + "\tat io.druid.indexing.kafka.supervisor.KafkaSupervisor$CheckpointNotice.handle(KafkaSupervisor.java:658)\n" - + "\tat io.druid.indexing.kafka.supervisor.KafkaSupervisor$2.run(KafkaSupervisor.java:367)\n" - + "\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n" - + "\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n" - + "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n" - + "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n" - + "\tat java.lang.Thread.run(Thread.java:748)\n" - + ""; - final String expectedExceptionMessage = "WTH?! cannot find taskGroup [0] among all taskGroups [{}]"; - Assert.assertEquals(expectedStackTrace, serviceEmitter.getStackTrace()); - Assert.assertEquals(expectedExceptionMessage, serviceEmitter.getExceptionMessage()); + Assert.assertNotNull(serviceEmitter.getStackTrace()); + Assert.assertEquals( + "WTH?! cannot find taskGroup [0] among all taskGroups [{}]", + serviceEmitter.getExceptionMessage() + ); Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); } From 632ab06be8c33db61eb5653a842fdb4fad51326f Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 13 Jul 2018 10:02:50 -0700 Subject: [PATCH 5/6] add a log for creating a new taskGroup --- .../kafka/supervisor/KafkaSupervisor.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 9e9c5916d8e8..ed287fa05919 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1090,13 +1090,16 @@ public Boolean apply(KafkaIndexTask.Status status) } else { final TaskGroup taskGroup = taskGroups.computeIfAbsent( taskGroupId, - k -> new TaskGroup( - ImmutableMap.copyOf( - kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap() - ), - kafkaTask.getIOConfig().getMinimumMessageTime(), - kafkaTask.getIOConfig().getMaximumMessageTime() - ) + k -> { + log.info("Creating a new task group for taskGroupId[%d]", taskGroupId); + return new TaskGroup( + ImmutableMap.copyOf( + kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap() + ), + kafkaTask.getIOConfig().getMinimumMessageTime(), + kafkaTask.getIOConfig().getMaximumMessageTime() + ); + } ); taskGroupsToVerify.add(taskGroupId); taskGroup.tasks.putIfAbsent(taskId, new TaskData()); From a5d743f322ef59b189c73f2f25b4f42b0da73e56 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 13 Jul 2018 12:08:06 -0700 Subject: [PATCH 6/6] fix backward compatibility in KafkaIOConfig --- .../druid/indexing/kafka/KafkaIOConfig.java | 14 +++++++---- .../indexing/kafka/KafkaIOConfigTest.java | 23 ------------------- 2 files changed, 9 insertions(+), 28 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index 83686d922728..b6c1d765c958 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -26,6 +26,7 @@ import io.druid.segment.indexing.IOConfig; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.Map; public class KafkaIOConfig implements IOConfig @@ -33,7 +34,8 @@ public class KafkaIOConfig implements IOConfig private static final boolean DEFAULT_USE_TRANSACTION = true; private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false; - private final int taskGroupId; + @Nullable + private final Integer taskGroupId; private final String baseSequenceName; private final KafkaPartitions startPartitions; private final KafkaPartitions endPartitions; @@ -45,7 +47,7 @@ public class KafkaIOConfig implements IOConfig @JsonCreator public KafkaIOConfig( - @JsonProperty("taskGroupId") Integer taskGroupId, + @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // can be null for backward compabitility @JsonProperty("baseSequenceName") String baseSequenceName, @JsonProperty("startPartitions") KafkaPartitions startPartitions, @JsonProperty("endPartitions") KafkaPartitions endPartitions, @@ -56,7 +58,7 @@ public KafkaIOConfig( @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps ) { - this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); + this.taskGroupId = taskGroupId; this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions"); this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); @@ -86,8 +88,9 @@ public KafkaIOConfig( } } + @Nullable @JsonProperty - public int getTaskGroupId() + public Integer getTaskGroupId() { return taskGroupId; } @@ -144,7 +147,8 @@ public boolean isSkipOffsetGaps() public String toString() { return "KafkaIOConfig{" + - "baseSequenceName='" + baseSequenceName + '\'' + + "taskGroupId=" + taskGroupId + + ", baseSequenceName='" + baseSequenceName + '\'' + ", startPartitions=" + startPartitions + ", endPartitions=" + endPartitions + ", consumerProperties=" + consumerProperties + diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java index 94c9fedbb6ff..050dba753b8e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java @@ -32,8 +32,6 @@ import org.junit.Test; import org.junit.rules.ExpectedException; -import java.io.IOException; - public class KafkaIOConfigTest { private final ObjectMapper mapper; @@ -259,25 +257,4 @@ public void testEndOffsetGreaterThanStart() throws Exception exception.expectMessage(CoreMatchers.containsString("end offset must be >= start offset")); mapper.readValue(jsonStr, IOConfig.class); } - - @Test - public void testSerdeWithoutGroupId() throws IOException - { - final String jsonStr = - "{\n" - + " \"type\": \"kafka\",\n" - + " \"baseSequenceName\": \"my-sequence-name\",\n" - + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" - + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" - + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n" - + "}"; - - exception.expect(CoreMatchers.instanceOf(JsonMappingException.class)); - exception.expectCause(CoreMatchers.instanceOf(NullPointerException.class)); - exception.expectMessage("taskGroupId"); - mapper.readValue( - mapper.writeValueAsString(mapper.readValue(jsonStr, IOConfig.class)), - IOConfig.class - ); - } }