diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 46598e245514..c6ff2784ccb2 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -52,7 +52,6 @@ import org.joda.time.Duration; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -278,11 +277,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint( - @Nullable Integer taskGroupId, - String baseSequenceName, - DataSourceMetadata checkpointMetadata - ) + public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) { // do nothing } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 51cda7d24b15..a121d6141447 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2705,8 +2705,7 @@ private void makeToolboxFactory() throws IOException @Override public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable Integer taskGroupId, - String baseSequenceName, + int taskGroupId, @Nullable DataSourceMetadata previousDataSourceMetadata ) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 2aec61040e3e..92b4e5e5ecd3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2378,7 +2378,6 @@ public void testCheckpointForInactiveTaskGroup() supervisor.moveTaskGroupToPendingCompletion(0); supervisor.checkpoint( 0, - id1.getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of()) ) @@ -2463,7 +2462,6 @@ public void testCheckpointForUnknownTaskGroup() supervisor.checkpoint( 0, - id1.getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of()) ) @@ -2489,104 +2487,6 @@ public void testCheckpointForUnknownTaskGroup() Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); } - @Test(timeout = 60_000L) - public void testCheckpointWithNullTaskGroupId() - throws InterruptedException - { - supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null); - final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig(); - supervisor.getStateManager().markRunFinished(); - - //not adding any events - final KafkaIndexTask id1 = createKafkaIndexTask( - "id1", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)), - null, - null, - tuningConfig - ); - - final Task id2 = createKafkaIndexTask( - "id2", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)), - null, - null, - tuningConfig - ); - - final Task id3 = createKafkaIndexTask( - "id3", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 0L), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, Long.MAX_VALUE)), - null, - null, - tuningConfig - ); - - EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); - EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); - EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); - EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); - EasyMock.expect( - indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) - ).anyTimes(); - taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); - EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) - .andReturn(Futures.immediateFuture(Status.READING)) - .anyTimes(); - final TreeMap> checkpoints = new TreeMap<>(); - checkpoints.put(0, ImmutableMap.of(0, 0L)); - EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.anyString(), EasyMock.anyBoolean())) - .andReturn(Futures.immediateFuture(checkpoints)) - .times(3); - EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) - .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) - .anyTimes(); - EasyMock.expect(taskClient.pauseAsync(EasyMock.anyString())) - .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L))) - .anyTimes(); - EasyMock.expect(taskClient.setEndOffsetsAsync( - EasyMock.anyString(), - EasyMock.eq(ImmutableMap.of(0, 10L)), - EasyMock.anyBoolean() - )) - .andReturn(Futures.immediateFuture(true)) - .anyTimes(); - - replayAll(); - - supervisor.start(); - - supervisor.runInternal(); - - supervisor.checkpoint( - null, - id1.getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of()) - ) - ); - - while (supervisor.getNoticesQueueSize() > 0) { - Thread.sleep(100); - } - - verifyAll(); - } - @Test public void testSuspendedNoRunningTasks() throws Exception { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index acfa30ac5376..81de1e2e9e58 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2912,8 +2912,7 @@ private void makeToolboxFactory() throws IOException @Override public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable Integer taskGroupId, - String baseSequenceName, + int taskGroupId, @Nullable DataSourceMetadata checkpointMetadata ) { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 6b87e4b1c528..3c8037b160f1 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -2971,7 +2971,6 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException supervisor.moveTaskGroupToPendingCompletion(0); supervisor.checkpoint( 0, - id1.getIOConfig().getBaseSequenceName(), new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), checkpoints.get(0).keySet()) ) @@ -3098,7 +3097,6 @@ public void testCheckpointForUnknownTaskGroup() supervisor.checkpoint( 0, - id1.getIOConfig().getBaseSequenceName(), new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.emptyMap(), ImmutableSet.of()) ) @@ -3123,111 +3121,6 @@ public void testCheckpointForUnknownTaskGroup() Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); } - @Test(timeout = 60_000L) - public void testCheckpointWithNullTaskGroupId() throws InterruptedException - { - supervisor = getTestableSupervisor(1, 3, true, "PT1S", null, null, false); - supervisor.getStateManager().markRunFinished(); - - //not adding any events - final KinesisIndexTask id1 = createKinesisIndexTask( - "id1", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of()), - new SeekableStreamEndSequenceNumbers<>( - "stream", - ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER) - ), - null, - null - ); - - final Task id2 = createKinesisIndexTask( - "id2", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of(SHARD_ID1)), - new SeekableStreamEndSequenceNumbers<>( - "stream", - ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER) - ), - null, - null - ); - - final Task id3 = createKinesisIndexTask( - "id3", - DATASOURCE, - 0, - new SeekableStreamStartSequenceNumbers<>("stream", ImmutableMap.of(SHARD_ID1, "0"), ImmutableSet.of(SHARD_ID1)), - new SeekableStreamEndSequenceNumbers<>( - "stream", - ImmutableMap.of(SHARD_ID1, KinesisSequenceNumber.NO_END_SEQUENCE_NUMBER) - ), - null, - null - ); - - EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)).andReturn(Collections.emptySet()).anyTimes(); - EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); - EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); - EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); - EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); - EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); - EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); - EasyMock.expect( - indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KinesisDataSourceMetadata( - null) - ).anyTimes(); - taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); - EasyMock.expect(taskClient.getStatusAsync(EasyMock.anyString())) - .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING)) - .anyTimes(); - final TreeMap> checkpoints = new TreeMap<>(); - checkpoints.put(0, ImmutableMap.of(SHARD_ID1, "0")); - EasyMock.expect(taskClient.getCheckpointsAsync(EasyMock.anyString(), EasyMock.anyBoolean())) - .andReturn(Futures.immediateFuture(checkpoints)) - .times(3); - EasyMock.expect(taskClient.getStartTimeAsync(EasyMock.anyString())) - .andReturn(Futures.immediateFuture(DateTimes.nowUtc())) - .anyTimes(); - EasyMock.expect(taskClient.pauseAsync(EasyMock.anyString())) - .andReturn(Futures.immediateFuture(ImmutableMap.of(SHARD_ID1, "10"))) - .anyTimes(); - EasyMock.expect(taskClient.setEndOffsetsAsync( - EasyMock.anyString(), - EasyMock.eq(ImmutableMap.of(SHARD_ID1, "10")), - EasyMock.anyBoolean() - )) - .andReturn(Futures.immediateFuture(true)) - .anyTimes(); - - replayAll(); - - supervisor.start(); - - supervisor.runInternal(); - - supervisor.checkpoint( - null, - id1.getIOConfig().getBaseSequenceName(), - new KinesisDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), ImmutableSet.of()) - ) - ); - - while (supervisor.getNoticesQueueSize() > 0) { - Thread.sleep(100); - } - - verifyAll(); - } - - @Test public void testSuspendedNoRunningTasks() throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index 46c90941636e..fe36308b5c06 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -31,23 +31,18 @@ public class CheckPointDataSourceMetadataAction implements TaskAction { private final String supervisorId; - @Nullable - private final Integer taskGroupId; - @Deprecated - private final String baseSequenceName; + private final int taskGroupId; private final SeekableStreamDataSourceMetadata checkpointMetadata; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, - @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility, - @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this + @JsonProperty("taskGroupId") Integer taskGroupId, @JsonProperty("previousCheckPoint") @Nullable @Deprecated SeekableStreamDataSourceMetadata previousCheckPoint, @JsonProperty("checkpointMetadata") @Nullable SeekableStreamDataSourceMetadata checkpointMetadata ) { this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); - this.taskGroupId = taskGroupId; - this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName"); + this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); this.checkpointMetadata = checkpointMetadata == null ? previousCheckPoint : checkpointMetadata; Preconditions.checkNotNull(this.checkpointMetadata, "checkpointMetadata"); @@ -65,13 +60,6 @@ public String getSupervisorId() return supervisorId; } - @Deprecated - @JsonProperty("sequenceName") - public String getBaseSequenceName() - { - return baseSequenceName; - } - @Nullable @JsonProperty public Integer getTaskGroupId() @@ -107,7 +95,6 @@ public Boolean perform(Task task, TaskActionToolbox toolbox) return toolbox.getSupervisorManager().checkPointDataSourceMetadata( supervisorId, taskGroupId, - baseSequenceName, checkpointMetadata ); } @@ -123,7 +110,6 @@ public String toString() { return "CheckPointDataSourceMetadataAction{" + "supervisorId='" + supervisorId + '\'' + - ", baseSequenceName='" + baseSequenceName + '\'' + ", taskGroupId='" + taskGroupId + '\'' + ", checkpointMetadata=" + checkpointMetadata + '}'; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 756b707f7604..48153b086ff3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -192,8 +192,7 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc public boolean checkPointDataSourceMetadata( String supervisorId, - @Nullable Integer taskGroupId, - String baseSequenceName, + int taskGroupId, DataSourceMetadata previousDataSourceMetadata ) { @@ -205,7 +204,7 @@ public boolean checkPointDataSourceMetadata( Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata); return true; } catch (Exception e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index c35830a48de7..3ac7139cc4c3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -724,7 +724,6 @@ public void onFailure(Throwable t) final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( task.getDataSource(), ioConfig.getTaskGroupId(), - task.getIOConfig().getBaseSequenceName(), null, createDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 5b1fad9a3c49..062acb586a31 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -349,59 +349,21 @@ public void handle() protected class CheckpointNotice implements Notice { - @Nullable - private final Integer nullableTaskGroupId; - @Deprecated - private final String baseSequenceName; + private final int taskGroupId; private final SeekableStreamDataSourceMetadata checkpointMetadata; CheckpointNotice( - @Nullable Integer nullableTaskGroupId, - @Deprecated String baseSequenceName, + int taskGroupId, SeekableStreamDataSourceMetadata checkpointMetadata ) { - this.baseSequenceName = baseSequenceName; - this.nullableTaskGroupId = nullableTaskGroupId; + this.taskGroupId = taskGroupId; this.checkpointMetadata = checkpointMetadata; } @Override public void handle() throws ExecutionException, InterruptedException { - // Find taskGroupId using taskId if it's null. It can be null while rolling update. - final int taskGroupId; - if (nullableTaskGroupId == null) { - // We search taskId in activelyReadingTaskGroups and pendingCompletionTaskGroups sequentially. This should be fine because - // 1) a taskGroup can be moved from activelyReadingTaskGroups to pendingCompletionTaskGroups in RunNotice - // (see checkTaskDuration()). - // 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the - // same time. - final java.util.Optional maybeGroupId = activelyReadingTaskGroups - .entrySet() - .stream() - .filter(entry -> { - final TaskGroup taskGroup = entry.getValue(); - return taskGroup.baseSequenceName.equals(baseSequenceName); - }) - .findAny() - .map(Entry::getKey); - - taskGroupId = maybeGroupId.orElseGet(() -> pendingCompletionTaskGroups - .entrySet() - .stream() - .filter(entry -> { - final List taskGroups = entry.getValue(); - return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName)); - }) - .findAny() - .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName)) - .getKey()); - - } else { - taskGroupId = nullableTaskGroupId; - } - // check for consistency // if already received request for this sequenceName and dataSourceMetadata combination then return final TaskGroup taskGroup = activelyReadingTaskGroups.get(taskGroupId); @@ -3129,11 +3091,7 @@ public SeekableStreamSupervisorIOConfig getIoConfig() } @Override - public void checkpoint( - @Nullable Integer taskGroupId, - @Deprecated String baseSequenceName, - DataSourceMetadata checkpointMetadata - ) + public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) { Preconditions.checkNotNull(checkpointMetadata, "checkpointMetadata"); @@ -3153,7 +3111,7 @@ public void checkpoint( ); log.info("Checkpointing [%s] for taskGroup [%s]", checkpointMetadata, taskGroupId); - addNotice(new CheckpointNotice(taskGroupId, baseSequenceName, seekableMetadata)); + addNotice(new CheckpointNotice(taskGroupId, seekableMetadata)); } @VisibleForTesting diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index a3c10889f4a2..a2aa29f9c25c 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -150,11 +150,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint( - @Nullable Integer taskGroupId, - String baseSequenceName, - DataSourceMetadata checkpointMetadata - ) + public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) { } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 5009345892ff..83c661a12ed8 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -61,12 +61,7 @@ default Boolean isHealthy() * represented by {@param currentCheckpoint} DataSourceMetadata * * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing - * @param baseSequenceName baseSequenceName * @param checkpointMetadata metadata for the sequence to currently checkpoint */ - void checkpoint( - @Nullable Integer taskGroupId, - @Deprecated String baseSequenceName, - DataSourceMetadata checkpointMetadata - ); + void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata); }