From 5f99650714c9fc20485e4c0073a4bf26c00a97ba Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 13 Aug 2018 11:38:12 -0700 Subject: [PATCH 1/6] Fix NPE for taskGroupId --- .../MaterializedViewSupervisor.java | 8 +- ...ementalPublishingKafkaIndexTaskRunner.java | 1 + .../kafka/supervisor/KafkaSupervisor.java | 31 ++++-- .../indexing/kafka/KafkaIndexTaskTest.java | 3 +- .../kafka/supervisor/KafkaSupervisorTest.java | 105 +++++++++++++++++- .../CheckPointDataSourceMetadataAction.java | 23 +++- .../supervisor/SupervisorManager.java | 5 +- .../supervisor/NoopSupervisorSpec.java | 3 +- .../overlord/supervisor/Supervisor.java | 9 +- 9 files changed, 169 insertions(+), 19 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java index fedda092c4d6..d90d99fb2f3e 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -53,6 +53,7 @@ import org.joda.time.Duration; import org.joda.time.Interval; +import javax.annotation.Nullable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -239,7 +240,12 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint) + public void checkpoint( + String taskId, + @Nullable Integer taskGroupId, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ) { // do nothing } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index a93fde611c5c..91341f48d654 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -602,6 +602,7 @@ public void onFailure(Throwable t) requestPause(); final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( task.getDataSource(), + task.getId(), ioConfig.getTaskGroupId(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e511b00dcd74..03b84375360f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -509,23 +509,38 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint) + public void checkpoint( + String taskId, + @Nullable Integer nullableTaskGroupId, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ) { - Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint"); - Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null"); + Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint"); + Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null"); Preconditions.checkArgument( - ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic()), + ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()), "Supervisor topic [%s] and topic in checkpoint [%s] does not match", ioConfig.getTopic(), - ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic() + ((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic() ); - log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId); + final int taskGroupId; + if (nullableTaskGroupId == null) { + taskGroupId = taskGroups.entrySet().stream().filter(entry -> { + final TaskGroup taskGroup = entry.getValue(); + return taskGroup.taskIds().contains(taskId); + }).findAny().orElseThrow(() -> new ISE("Cannot find taskGroup for taskId[%s]", taskId)).getKey(); + } else { + taskGroupId = nullableTaskGroupId; + } + + log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId); notices.add( new CheckpointNotice( taskGroupId, - (KafkaDataSourceMetadata) previousCheckpoint, - (KafkaDataSourceMetadata) currentCheckpoint + (KafkaDataSourceMetadata) previousCheckPoint, + (KafkaDataSourceMetadata) currentCheckPoint ) ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 1929bb451321..fa836abdfa43 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2060,7 +2060,8 @@ private void makeToolboxFactory() throws IOException @Override public boolean checkPointDataSourceMetadata( String supervisorId, - int taskGroupId, + String taskId, + @Nullable Integer taskGroupId, @Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata currentDataSourceMetadata ) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 36f3d733d6f6..f5bdeb79f30e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2103,6 +2103,7 @@ public void testCheckpointForInactiveTaskGroup() final Map fakeCheckpoints = Collections.emptyMap(); supervisor.moveTaskGroupToPendingCompletion(0); supervisor.checkpoint( + id1.getId(), // should be fine with any taskId because taskGroupId is not null 0, new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints)) @@ -2172,6 +2173,7 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException supervisor.start(); supervisor.checkpoint( + id1.getId(), // should be fine with any taskId because taskGroupId is not null 0, new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())) @@ -2195,13 +2197,114 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); } + @Test(timeout = 60_000L) + public void testCheckpointWithNullTaskGroupId() + throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException + { + supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + expect(taskClient.getStatusAsync(anyString())) + .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)) + .anyTimes(); + final TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 0L)); + expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(3); + expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes(); + expect(taskClient.pauseAsync(anyString())) + .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L))) + .anyTimes(); + expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean())) + .andReturn(Futures.immediateFuture(true)) + .anyTimes(); +// taskQueue.shutdown("id1"); +// taskQueue.shutdown("id2"); +// taskQueue.shutdown("id3"); + + replayAll(); + + supervisor.start(); + + supervisor.runInternal(); + + final TreeMap> newCheckpoints = new TreeMap<>(); + newCheckpoints.put(0, ImmutableMap.of(0, 10L)); + supervisor.checkpoint( + id1.getId(), + null, + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0))) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + +// while (serviceEmitter.getStackTrace() == null) { +// Thread.sleep(100); +// } + +// Assert.assertTrue(serviceEmitter.getStackTrace().startsWith("io.druid.java.util.common.ISE: WTH?! cannot find")); +// Assert.assertEquals( +// "WTH?! cannot find taskGroup [0] among all taskGroups [{}]", +// serviceEmitter.getExceptionMessage() +// ); +// Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { for (int i = 0; i < NUM_PARTITIONS; i++) { for (int j = 0; j < numEventsPerPartition; j++) { kafkaProducer.send( - new ProducerRecord( + new ProducerRecord<>( topic, i, null, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index f1d11deb4eaf..e1133f9bf180 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -25,22 +25,28 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.DataSourceMetadata; +import javax.annotation.Nullable; + public class CheckPointDataSourceMetadataAction implements TaskAction { private final String supervisorId; - private final int taskGroupId; + private final String taskId; + @Nullable + private final Integer taskGroupId; private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata currentCheckPoint; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, - @JsonProperty("taskGroupId") Integer taskGroupId, + @JsonProperty("taskId") String taskId, // used when taskGroupId is null + @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint ) { this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); - this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); + this.taskId = Preconditions.checkNotNull(taskId, "taskId"); + this.taskGroupId = taskGroupId; this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); } @@ -52,7 +58,14 @@ public String getSupervisorId() } @JsonProperty - public int getTaskGroupId() + public String getTaskId() + { + return taskId; + } + + @Nullable + @JsonProperty + public Integer getTaskGroupId() { return taskGroupId; } @@ -84,6 +97,7 @@ public Boolean perform( { return toolbox.getSupervisorManager().checkPointDataSourceMetadata( supervisorId, + taskId, taskGroupId, previousCheckPoint, currentCheckPoint @@ -101,6 +115,7 @@ public String toString() { return "CheckPointDataSourceMetadataAction{" + "supervisorId='" + supervisorId + '\'' + + ", taskId='" + taskId + '\'' + ", taskGroupId='" + taskGroupId + '\'' + ", previousCheckPoint=" + previousCheckPoint + ", currentCheckPoint=" + currentCheckPoint + diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 355465cec5d5..9707df36b2b3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -165,7 +165,8 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc public boolean checkPointDataSourceMetadata( String supervisorId, - int taskGroupId, + @Nullable String taskId, + Integer taskGroupId, DataSourceMetadata previousDataSourceMetadata, DataSourceMetadata currentDataSourceMetadata ) @@ -178,7 +179,7 @@ public boolean checkPointDataSourceMetadata( Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskId, taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); return true; } catch (Exception e) { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 0408104cde83..400a70f0a5c3 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -83,7 +83,8 @@ public void reset(DataSourceMetadata dataSourceMetadata) {} @Override public void checkpoint( - int taskGroupId, + String taskId, + @Nullable Integer taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index 04afac7aea6b..bbdcaea73e95 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import io.druid.indexing.overlord.DataSourceMetadata; +import javax.annotation.Nullable; import java.util.Map; public interface Supervisor @@ -51,9 +52,15 @@ default Map> getStats() * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data * represented by {@param currentCheckpoint} DataSourceMetadata * + * @param taskId task identifier * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing * @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param currentCheckPoint current DataSourceMetadata to be checkpointed */ - void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint); + void checkpoint( + String taskId, + @Nullable Integer taskGroupId, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ); } From a7cb2abafd88788c4210e34dff5a2d965437cd44 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 13 Aug 2018 11:40:05 -0700 Subject: [PATCH 2/6] missing changes --- .../kafka/supervisor/KafkaSupervisorTest.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index f5bdeb79f30e..6783113a245e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2261,9 +2261,6 @@ public void testCheckpointWithNullTaskGroupId() expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean())) .andReturn(Futures.immediateFuture(true)) .anyTimes(); -// taskQueue.shutdown("id1"); -// taskQueue.shutdown("id2"); -// taskQueue.shutdown("id3"); replayAll(); @@ -2285,17 +2282,6 @@ public void testCheckpointWithNullTaskGroupId() } verifyAll(); - -// while (serviceEmitter.getStackTrace() == null) { -// Thread.sleep(100); -// } - -// Assert.assertTrue(serviceEmitter.getStackTrace().startsWith("io.druid.java.util.common.ISE: WTH?! cannot find")); -// Assert.assertEquals( -// "WTH?! cannot find taskGroup [0] among all taskGroups [{}]", -// serviceEmitter.getExceptionMessage() -// ); -// Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); } private void addSomeEvents(int numEventsPerPartition) throws Exception From 206cdc606b4340f9a534ce8b29d9da7d2f0047ac Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 13 Aug 2018 12:45:54 -0700 Subject: [PATCH 3/6] fix wrong annotation --- .../druid/indexing/overlord/supervisor/SupervisorManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 9707df36b2b3..34a4a619a441 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -165,8 +165,8 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable String taskId, - Integer taskGroupId, + String taskId, + @Nullable Integer taskGroupId, DataSourceMetadata previousDataSourceMetadata, DataSourceMetadata currentDataSourceMetadata ) From 7edc323927fa6383dc2a0e3d8da8144bf625b98b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 13 Aug 2018 14:29:32 -0700 Subject: [PATCH 4/6] fix potential race --- .../kafka/supervisor/KafkaSupervisor.java | 72 ++++++++++++------- 1 file changed, 46 insertions(+), 26 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 03b84375360f..d12c0b912c08 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -92,6 +92,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Random; import java.util.Set; @@ -511,7 +512,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) @Override public void checkpoint( String taskId, - @Nullable Integer nullableTaskGroupId, + @Nullable Integer taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) @@ -525,19 +526,10 @@ public void checkpoint( ((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic() ); - final int taskGroupId; - if (nullableTaskGroupId == null) { - taskGroupId = taskGroups.entrySet().stream().filter(entry -> { - final TaskGroup taskGroup = entry.getValue(); - return taskGroup.taskIds().contains(taskId); - }).findAny().orElseThrow(() -> new ISE("Cannot find taskGroup for taskId[%s]", taskId)).getKey(); - } else { - taskGroupId = nullableTaskGroupId; - } - log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId); notices.add( new CheckpointNotice( + taskId, taskGroupId, (KafkaDataSourceMetadata) previousCheckPoint, (KafkaDataSourceMetadata) currentCheckPoint @@ -644,17 +636,20 @@ public void handle() private class CheckpointNotice implements Notice { - final int taskGroupId; - final KafkaDataSourceMetadata previousCheckpoint; - final KafkaDataSourceMetadata currentCheckpoint; + private final String taskId; + @Nullable private final Integer nullableTaskGroupId; + private final KafkaDataSourceMetadata previousCheckpoint; + private final KafkaDataSourceMetadata currentCheckpoint; CheckpointNotice( - int taskGroupId, + String taskId, + @Nullable Integer nullableTaskGroupId, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint ) { - this.taskGroupId = taskGroupId; + this.taskId = taskId; + this.nullableTaskGroupId = nullableTaskGroupId; this.previousCheckpoint = previousCheckpoint; this.currentCheckpoint = currentCheckpoint; } @@ -662,12 +657,44 @@ private class CheckpointNotice implements Notice @Override public void handle() throws ExecutionException, InterruptedException { + // Find taskGroupId using taskId if it's null. It can be null while rolling update. + final int taskGroupId; + if (nullableTaskGroupId == null) { + // We search taskId in taskGroups and pendingCompletionTaskGroups sequentially. This should be fine because + // 1) a taskGroup can be moved from taskGroups to pendingCompletionTaskGroups in RunNotice + // (see checkTaskDuration()). + // 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the + // same time. + final java.util.Optional maybeGroupId = taskGroups + .entrySet() + .stream() + .filter(entry -> { + final TaskGroup taskGroup = entry.getValue(); + return taskGroup.taskIds().contains(taskId); + }) + .findAny() + .map(Entry::getKey); + taskGroupId = maybeGroupId.orElse( + pendingCompletionTaskGroups + .entrySet() + .stream() + .filter(entry -> { + final List taskGroups = entry.getValue(); + return taskGroups.stream().anyMatch(group -> group.taskIds().contains(taskId)); + }) + .findAny() + .orElseThrow(() -> new ISE("Cannot find taskGroup for taskId[%s]", taskId)) + .getKey() + ); + } else { + taskGroupId = nullableTaskGroupId; + } + // check for consistency // if already received request for this sequenceName and dataSourceMetadata combination then return - final TaskGroup taskGroup = taskGroups.get(taskGroupId); - if (isValidTaskGroup(taskGroup)) { + if (isValidTaskGroup(taskGroupId, taskGroup)) { final TreeMap> checkpoints = taskGroup.sequenceOffsets; // check validity of previousCheckpoint @@ -689,20 +716,13 @@ public void handle() throws ExecutionException, InterruptedException log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); return; } - final int taskGroupId = getTaskGroupIdForPartition( - currentCheckpoint.getKafkaPartitions() - .getPartitionOffsetMap() - .keySet() - .iterator() - .next() - ); final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); } } - private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup) + private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup) { if (taskGroup == null) { // taskGroup might be in pendingCompletionTaskGroups or partitionGroups From 31ba33ffcb46109d7b379bab7905d310cdf15509 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 13 Aug 2018 18:18:33 -0700 Subject: [PATCH 5/6] keep baseSequenceName --- .../MaterializedViewSupervisor.java | 2 +- ...ementalPublishingKafkaIndexTaskRunner.java | 2 +- .../kafka/supervisor/KafkaSupervisor.java | 43 ++++++++----------- .../indexing/kafka/KafkaIndexTaskTest.java | 2 +- .../kafka/supervisor/KafkaSupervisorTest.java | 6 +-- .../CheckPointDataSourceMetadataAction.java | 18 ++++---- .../supervisor/SupervisorManager.java | 4 +- .../supervisor/NoopSupervisorSpec.java | 2 +- .../overlord/supervisor/Supervisor.java | 4 +- 9 files changed, 38 insertions(+), 45 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java index d90d99fb2f3e..d499b4f0e8d2 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/io/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -241,8 +241,8 @@ public void reset(DataSourceMetadata dataSourceMetadata) @Override public void checkpoint( - String taskId, @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 91341f48d654..3a385072c300 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -602,8 +602,8 @@ public void onFailure(Throwable t) requestPause(); final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( task.getDataSource(), - task.getId(), ioConfig.getTaskGroupId(), + task.getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index d12c0b912c08..ae60dca05050 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -144,7 +144,7 @@ public class KafkaSupervisor implements Supervisor * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups] * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]). */ - private static class TaskGroup + private class TaskGroup { // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in @@ -158,6 +158,7 @@ private static class TaskGroup final Optional maximumMessageTime; DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action final TreeMap> sequenceOffsets = new TreeMap<>(); + final String baseSequenceName; TaskGroup( ImmutableMap partitionOffsets, @@ -169,6 +170,7 @@ private static class TaskGroup this.minimumMessageTime = minimumMessageTime; this.maximumMessageTime = maximumMessageTime; this.sequenceOffsets.put(0, partitionOffsets); + this.baseSequenceName = generateSequenceName(partitionOffsets, minimumMessageTime, maximumMessageTime); } int addNewCheckpoint(Map checkpoint) @@ -511,8 +513,8 @@ public void reset(DataSourceMetadata dataSourceMetadata) @Override public void checkpoint( - String taskId, @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) @@ -529,8 +531,8 @@ public void checkpoint( log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId); notices.add( new CheckpointNotice( - taskId, taskGroupId, + baseSequenceName, (KafkaDataSourceMetadata) previousCheckPoint, (KafkaDataSourceMetadata) currentCheckPoint ) @@ -636,19 +638,19 @@ public void handle() private class CheckpointNotice implements Notice { - private final String taskId; + private final String baseSequenceName; @Nullable private final Integer nullableTaskGroupId; private final KafkaDataSourceMetadata previousCheckpoint; private final KafkaDataSourceMetadata currentCheckpoint; CheckpointNotice( - String taskId, @Nullable Integer nullableTaskGroupId, + String baseSequenceName, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint ) { - this.taskId = taskId; + this.baseSequenceName = baseSequenceName; this.nullableTaskGroupId = nullableTaskGroupId; this.previousCheckpoint = previousCheckpoint; this.currentCheckpoint = currentCheckpoint; @@ -670,7 +672,7 @@ public void handle() throws ExecutionException, InterruptedException .stream() .filter(entry -> { final TaskGroup taskGroup = entry.getValue(); - return taskGroup.taskIds().contains(taskId); + return taskGroup.baseSequenceName.equals(baseSequenceName); }) .findAny() .map(Entry::getKey); @@ -680,10 +682,10 @@ public void handle() throws ExecutionException, InterruptedException .stream() .filter(entry -> { final List taskGroups = entry.getValue(); - return taskGroups.stream().anyMatch(group -> group.taskIds().contains(taskId)); + return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName)); }) .findAny() - .orElseThrow(() -> new ISE("Cannot find taskGroup for taskId[%s]", taskId)) + .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName)) .getKey() ); } else { @@ -921,17 +923,6 @@ String generateSequenceName( return Joiner.on("_").join("index_kafka", dataSource, hashCode); } - @VisibleForTesting - String generateSequenceName(TaskGroup taskGroup) - { - Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null"); - return generateSequenceName( - taskGroup.partitionOffsets, - taskGroup.minimumMessageTime, - taskGroup.maximumMessageTime - ); - } - private static String getRandomId() { final StringBuilder suffix = new StringBuilder(8); @@ -1809,7 +1800,6 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc endPartitions.put(partition, Long.MAX_VALUE); } TaskGroup group = taskGroups.get(groupId); - String sequenceName = generateSequenceName(group); Map consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull(); @@ -1817,7 +1807,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( groupId, - sequenceName, + group.baseSequenceName, new KafkaPartitions(ioConfig.getTopic(), startPartitions), new KafkaPartitions(ioConfig.getTopic(), endPartitions), consumerProperties, @@ -1838,10 +1828,10 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc .putAll(spec.getContext()) .build(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(sequenceName, getRandomId()); + String taskId = Joiner.on("_").join(group.baseSequenceName, getRandomId()); KafkaIndexTask indexTask = new KafkaIndexTask( taskId, - new TaskResource(sequenceName, 1), + new TaskResource(group.baseSequenceName, 1), spec.getDataSchema(), taskTuningConfig, kafkaIOConfig, @@ -1971,7 +1961,10 @@ private boolean isTaskCurrent(int taskGroupId, String taskId) String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName(); if (taskGroups.get(taskGroupId) != null) { - return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName); + return Preconditions + .checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId) + .baseSequenceName + .equals(taskSequenceName); } else { return generateSequenceName( ((KafkaIndexTask) taskOptional.get()).getIOConfig() diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index fa836abdfa43..c3cf1153671b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2060,8 +2060,8 @@ private void makeToolboxFactory() throws IOException @Override public boolean checkPointDataSourceMetadata( String supervisorId, - String taskId, @Nullable Integer taskGroupId, + String baseSequenceName, @Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata currentDataSourceMetadata ) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 6783113a245e..83e253ff9f15 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2103,8 +2103,8 @@ public void testCheckpointForInactiveTaskGroup() final Map fakeCheckpoints = Collections.emptyMap(); supervisor.moveTaskGroupToPendingCompletion(0); supervisor.checkpoint( - id1.getId(), // should be fine with any taskId because taskGroupId is not null 0, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints)) ); @@ -2173,8 +2173,8 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException supervisor.start(); supervisor.checkpoint( - id1.getId(), // should be fine with any taskId because taskGroupId is not null 0, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())) ); @@ -2271,8 +2271,8 @@ public void testCheckpointWithNullTaskGroupId() final TreeMap> newCheckpoints = new TreeMap<>(); newCheckpoints.put(0, ImmutableMap.of(0, 10L)); supervisor.checkpoint( - id1.getId(), null, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0))) ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index e1133f9bf180..bcc78807e9da 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -30,23 +30,23 @@ public class CheckPointDataSourceMetadataAction implements TaskAction { private final String supervisorId; - private final String taskId; @Nullable private final Integer taskGroupId; + private final String baseSequenceName; private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata currentCheckPoint; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, - @JsonProperty("taskId") String taskId, // used when taskGroupId is null - @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility + @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility, + @JsonProperty("sequenceName") String baseSequenceName, // old version would use this @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint ) { this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); - this.taskId = Preconditions.checkNotNull(taskId, "taskId"); this.taskGroupId = taskGroupId; + this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName"); this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); } @@ -57,10 +57,10 @@ public String getSupervisorId() return supervisorId; } - @JsonProperty - public String getTaskId() + @JsonProperty("sequenceName") + public String getBaseSequenceName() { - return taskId; + return baseSequenceName; } @Nullable @@ -97,8 +97,8 @@ public Boolean perform( { return toolbox.getSupervisorManager().checkPointDataSourceMetadata( supervisorId, - taskId, taskGroupId, + baseSequenceName, previousCheckPoint, currentCheckPoint ); @@ -115,7 +115,7 @@ public String toString() { return "CheckPointDataSourceMetadataAction{" + "supervisorId='" + supervisorId + '\'' + - ", taskId='" + taskId + '\'' + + ", baseSequenceName='" + baseSequenceName + '\'' + ", taskGroupId='" + taskGroupId + '\'' + ", previousCheckPoint=" + previousCheckPoint + ", currentCheckPoint=" + currentCheckPoint + diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 34a4a619a441..03cc96f5b0b2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -165,8 +165,8 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc public boolean checkPointDataSourceMetadata( String supervisorId, - String taskId, @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousDataSourceMetadata, DataSourceMetadata currentDataSourceMetadata ) @@ -179,7 +179,7 @@ public boolean checkPointDataSourceMetadata( Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(taskId, taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata); return true; } catch (Exception e) { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 400a70f0a5c3..ccf695e41a52 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -83,8 +83,8 @@ public void reset(DataSourceMetadata dataSourceMetadata) {} @Override public void checkpoint( - String taskId, @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index bbdcaea73e95..0b412d7189b2 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -52,14 +52,14 @@ default Map> getStats() * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data * represented by {@param currentCheckpoint} DataSourceMetadata * - * @param taskId task identifier * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing + * @param baseSequenceName baseSequenceName * @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param currentCheckPoint current DataSourceMetadata to be checkpointed */ void checkpoint( - String taskId, @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ); From 45624cc2baefe2b8a7e7f63f5571e92dd35ebf65 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 13 Aug 2018 21:57:34 -0700 Subject: [PATCH 6/6] make deprecated old param --- .../io/druid/indexing/kafka/supervisor/KafkaSupervisor.java | 6 +++--- .../common/actions/CheckPointDataSourceMetadataAction.java | 4 +++- .../io/druid/indexing/overlord/supervisor/Supervisor.java | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index ae60dca05050..8c9bb599ada4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -514,7 +514,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) @Override public void checkpoint( @Nullable Integer taskGroupId, - String baseSequenceName, + @Deprecated String baseSequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) @@ -638,14 +638,14 @@ public void handle() private class CheckpointNotice implements Notice { - private final String baseSequenceName; @Nullable private final Integer nullableTaskGroupId; + @Deprecated private final String baseSequenceName; private final KafkaDataSourceMetadata previousCheckpoint; private final KafkaDataSourceMetadata currentCheckpoint; CheckpointNotice( @Nullable Integer nullableTaskGroupId, - String baseSequenceName, + @Deprecated String baseSequenceName, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint ) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index bcc78807e9da..433af987be3b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -32,6 +32,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction private final String supervisorId; @Nullable private final Integer taskGroupId; + @Deprecated private final String baseSequenceName; private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata currentCheckPoint; @@ -39,7 +40,7 @@ public class CheckPointDataSourceMetadataAction implements TaskAction public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility, - @JsonProperty("sequenceName") String baseSequenceName, // old version would use this + @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint ) @@ -57,6 +58,7 @@ public String getSupervisorId() return supervisorId; } + @Deprecated @JsonProperty("sequenceName") public String getBaseSequenceName() { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index 0b412d7189b2..6f3c05003a80 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -59,7 +59,7 @@ default Map> getStats() */ void checkpoint( @Nullable Integer taskGroupId, - String baseSequenceName, + @Deprecated String baseSequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint );