From 4e44af1e05e41f69b0e95efdc46414224ed158e9 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 17 Aug 2018 10:15:45 -0700 Subject: [PATCH] [Backport] Fix NPE for taskGroupId when rolling update (#6168) --- .../druid/indexing/kafka/KafkaIndexTask.java | 1 + .../kafka/supervisor/KafkaSupervisor.java | 110 +++++++++++------- .../indexing/kafka/KafkaIndexTaskTest.java | 3 +- .../kafka/supervisor/KafkaSupervisorTest.java | 91 ++++++++++++++- .../CheckPointDataSourceMetadataAction.java | 24 +++- .../supervisor/SupervisorManager.java | 5 +- .../supervisor/NoopSupervisorSpec.java | 3 +- .../overlord/supervisor/Supervisor.java | 10 +- 8 files changed, 195 insertions(+), 52 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 5f039c0336b4..c14354c0d214 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -705,6 +705,7 @@ public void onFailure(Throwable t) final CheckPointDataSourceMetadataAction checkpointAction = new CheckPointDataSourceMetadataAction( getDataSource(), ioConfig.getTaskGroupId(), + getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, sequenceToCheckpoint.getStartOffsets())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, nextOffsets)) ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index cb3352d94026..63868230c3dd 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -92,6 +92,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Random; import java.util.Set; @@ -143,7 +144,7 @@ public class KafkaSupervisor implements Supervisor * time, there should only be up to a maximum of [taskCount] actively-reading task groups (tracked in the [taskGroups] * map) + zero or more pending-completion task groups (tracked in [pendingCompletionTaskGroups]). */ - private static class TaskGroup + private class TaskGroup { // This specifies the partitions and starting offsets for this task group. It is set on group creation from the data // in [partitionGroups] and never changes during the lifetime of this task group, which will live until a task in @@ -157,6 +158,7 @@ private static class TaskGroup final Optional maximumMessageTime; DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action final TreeMap> sequenceOffsets = new TreeMap<>(); + final String baseSequenceName; TaskGroup( ImmutableMap partitionOffsets, @@ -168,6 +170,7 @@ private static class TaskGroup this.minimumMessageTime = minimumMessageTime; this.maximumMessageTime = maximumMessageTime; this.sequenceOffsets.put(0, partitionOffsets); + this.baseSequenceName = generateSequenceName(partitionOffsets, minimumMessageTime, maximumMessageTime); } int addNewCheckpoint(Map checkpoint) @@ -490,25 +493,29 @@ public void reset(DataSourceMetadata dataSourceMetadata) } @Override - public void checkpoint(int taskGroupId, DataSourceMetadata previousCheckpoint, DataSourceMetadata currentCheckpoint) + public void checkpoint( + @Nullable Integer taskGroupId, + @Deprecated String baseSequenceName, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ) { - Preconditions.checkNotNull(previousCheckpoint, "previousCheckpoint"); - Preconditions.checkNotNull(currentCheckpoint, "current checkpoint cannot be null"); + Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint"); + Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null"); Preconditions.checkArgument( - ioConfig.getTopic() - .equals(((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions() - .getTopic()), + ioConfig.getTopic().equals(((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic()), "Supervisor topic [%s] and topic in checkpoint [%s] does not match", ioConfig.getTopic(), - ((KafkaDataSourceMetadata) currentCheckpoint).getKafkaPartitions().getTopic() + ((KafkaDataSourceMetadata) currentCheckPoint).getKafkaPartitions().getTopic() ); - log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckpoint, taskGroupId); + log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId); notices.add( new CheckpointNotice( taskGroupId, - (KafkaDataSourceMetadata) previousCheckpoint, - (KafkaDataSourceMetadata) currentCheckpoint + baseSequenceName, + (KafkaDataSourceMetadata) previousCheckPoint, + (KafkaDataSourceMetadata) currentCheckPoint ) ); } @@ -612,17 +619,20 @@ public void handle() private class CheckpointNotice implements Notice { - final int taskGroupId; - final KafkaDataSourceMetadata previousCheckpoint; - final KafkaDataSourceMetadata currentCheckpoint; + @Nullable private final Integer nullableTaskGroupId; + @Deprecated private final String baseSequenceName; + private final KafkaDataSourceMetadata previousCheckpoint; + private final KafkaDataSourceMetadata currentCheckpoint; CheckpointNotice( - int taskGroupId, + @Nullable Integer nullableTaskGroupId, + @Deprecated String baseSequenceName, KafkaDataSourceMetadata previousCheckpoint, KafkaDataSourceMetadata currentCheckpoint ) { - this.taskGroupId = taskGroupId; + this.baseSequenceName = baseSequenceName; + this.nullableTaskGroupId = nullableTaskGroupId; this.previousCheckpoint = previousCheckpoint; this.currentCheckpoint = currentCheckpoint; } @@ -630,12 +640,44 @@ private class CheckpointNotice implements Notice @Override public void handle() throws ExecutionException, InterruptedException, TimeoutException { + // Find taskGroupId using taskId if it's null. It can be null while rolling update. + final int taskGroupId; + if (nullableTaskGroupId == null) { + // We search taskId in taskGroups and pendingCompletionTaskGroups sequentially. This should be fine because + // 1) a taskGroup can be moved from taskGroups to pendingCompletionTaskGroups in RunNotice + // (see checkTaskDuration()). + // 2) Notices are proceesed by a single thread. So, CheckpointNotice and RunNotice cannot be processed at the + // same time. + final java.util.Optional maybeGroupId = taskGroups + .entrySet() + .stream() + .filter(entry -> { + final TaskGroup taskGroup = entry.getValue(); + return taskGroup.baseSequenceName.equals(baseSequenceName); + }) + .findAny() + .map(Entry::getKey); + taskGroupId = maybeGroupId.orElse( + pendingCompletionTaskGroups + .entrySet() + .stream() + .filter(entry -> { + final List taskGroups = entry.getValue(); + return taskGroups.stream().anyMatch(group -> group.baseSequenceName.equals(baseSequenceName)); + }) + .findAny() + .orElseThrow(() -> new ISE("Cannot find taskGroup for baseSequenceName[%s]", baseSequenceName)) + .getKey() + ); + } else { + taskGroupId = nullableTaskGroupId; + } + // check for consistency // if already received request for this sequenceName and dataSourceMetadata combination then return - final TaskGroup taskGroup = taskGroups.get(taskGroupId); - if (isValidTaskGroup(taskGroup)) { + if (isValidTaskGroup(taskGroupId, taskGroup)) { final TreeMap> checkpoints = taskGroup.sequenceOffsets; // check validity of previousCheckpoint @@ -655,20 +697,13 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc log.info("Already checkpointed with offsets [%s]", checkpoints.lastEntry().getValue()); return; } - final int taskGroupId = getTaskGroupIdForPartition( - currentCheckpoint.getKafkaPartitions() - .getPartitionOffsetMap() - .keySet() - .iterator() - .next() - ); final Map newCheckpoint = checkpointTaskGroup(taskGroupId, false).get(); taskGroups.get(taskGroupId).addNewCheckpoint(newCheckpoint); log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", newCheckpoint, taskGroupId); } } - private boolean isValidTaskGroup(@Nullable TaskGroup taskGroup) + private boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup) { if (taskGroup == null) { // taskGroup might be in pendingCompletionTaskGroups or partitionGroups @@ -867,17 +902,6 @@ String generateSequenceName( return Joiner.on("_").join("index_kafka", dataSource, hashCode); } - @VisibleForTesting - String generateSequenceName(TaskGroup taskGroup) - { - Preconditions.checkNotNull(taskGroup, "taskGroup cannot be null"); - return generateSequenceName( - taskGroup.partitionOffsets, - taskGroup.minimumMessageTime, - taskGroup.maximumMessageTime - ); - } - private static String getRandomId() { final StringBuilder suffix = new StringBuilder(8); @@ -1748,7 +1772,6 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc endPartitions.put(partition, Long.MAX_VALUE); } TaskGroup group = taskGroups.get(groupId); - String sequenceName = generateSequenceName(group); Map consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties()); DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull(); @@ -1756,7 +1779,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc KafkaIOConfig kafkaIOConfig = new KafkaIOConfig( groupId, - sequenceName, + group.baseSequenceName, new KafkaPartitions(ioConfig.getTopic(), startPartitions), new KafkaPartitions(ioConfig.getTopic(), endPartitions), consumerProperties, @@ -1777,10 +1800,10 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc .putAll(spec.getContext()) .build(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(sequenceName, getRandomId()); + String taskId = Joiner.on("_").join(group.baseSequenceName, getRandomId()); KafkaIndexTask indexTask = new KafkaIndexTask( taskId, - new TaskResource(sequenceName, 1), + new TaskResource(group.baseSequenceName, 1), spec.getDataSchema(), taskTuningConfig, kafkaIOConfig, @@ -1909,7 +1932,10 @@ private boolean isTaskCurrent(int taskGroupId, String taskId) String taskSequenceName = ((KafkaIndexTask) taskOptional.get()).getIOConfig().getBaseSequenceName(); if (taskGroups.get(taskGroupId) != null) { - return generateSequenceName(taskGroups.get(taskGroupId)).equals(taskSequenceName); + return Preconditions + .checkNotNull(taskGroups.get(taskGroupId), "null taskGroup for taskId[%s]", taskGroupId) + .baseSequenceName + .equals(taskSequenceName); } else { return generateSequenceName( ((KafkaIndexTask) taskOptional.get()).getIOConfig() diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 2a852d4a8ad9..d89f5e2f026e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1776,7 +1776,8 @@ private void makeToolboxFactory() throws IOException @Override public boolean checkPointDataSourceMetadata( String supervisorId, - int taskGroupId, + @Nullable Integer taskGroupId, + String baseSequenceName, @Nullable DataSourceMetadata previousDataSourceMetadata, @Nullable DataSourceMetadata currentDataSourceMetadata ) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 2add456df8e6..5193b5bbab8a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2103,6 +2103,7 @@ public void testCheckpointForInactiveTaskGroup() supervisor.moveTaskGroupToPendingCompletion(0); supervisor.checkpoint( 0, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), new KafkaDataSourceMetadata(new KafkaPartitions(topic, fakeCheckpoints)) ); @@ -2172,6 +2173,7 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException supervisor.checkpoint( 0, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())), new KafkaDataSourceMetadata(new KafkaPartitions(topic, Collections.emptyMap())) ); @@ -2190,13 +2192,100 @@ public void testCheckpointForUnknownTaskGroup() throws InterruptedException Assert.assertEquals(ISE.class, serviceEmitter.getExceptionClass()); } + @Test(timeout = 60_000L) + public void testCheckpointWithNullTaskGroupId() + throws InterruptedException, ExecutionException, TimeoutException, JsonProcessingException + { + supervisor = getSupervisor(1, 3, true, "PT1S", null, null, false); + //not adding any events + final Task id1 = createKafkaIndexTask( + "id1", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + final Task id2 = createKafkaIndexTask( + "id2", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + final Task id3 = createKafkaIndexTask( + "id3", + DATASOURCE, + 0, + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, Long.MAX_VALUE)), + null, + null + ); + + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of(id1, id2, id3)).anyTimes(); + expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes(); + expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes(); + expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes(); + expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes(); + expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes(); + expect(taskStorage.getTask("id3")).andReturn(Optional.of(id3)).anyTimes(); + expect( + indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(new KafkaDataSourceMetadata(null) + ).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + expect(taskClient.getStatusAsync(anyString())) + .andReturn(Futures.immediateFuture(KafkaIndexTask.Status.READING)) + .anyTimes(); + final TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ImmutableMap.of(0, 0L)); + expect(taskClient.getCheckpointsAsync(anyString(), anyBoolean())) + .andReturn(Futures.immediateFuture(checkpoints)) + .times(3); + expect(taskClient.getStartTimeAsync(anyString())).andReturn(Futures.immediateFuture(DateTimes.nowUtc())).anyTimes(); + expect(taskClient.pauseAsync(anyString())) + .andReturn(Futures.immediateFuture(ImmutableMap.of(0, 10L))) + .anyTimes(); + expect(taskClient.setEndOffsetsAsync(anyString(), EasyMock.eq(ImmutableMap.of(0, 10L)), anyBoolean())) + .andReturn(Futures.immediateFuture(true)) + .anyTimes(); + + replayAll(); + + supervisor.start(); + + supervisor.runInternal(); + + final TreeMap> newCheckpoints = new TreeMap<>(); + newCheckpoints.put(0, ImmutableMap.of(0, 10L)); + supervisor.checkpoint( + null, + ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, checkpoints.get(0))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, newCheckpoints.get(0))) + ); + + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + + verifyAll(); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { for (int i = 0; i < NUM_PARTITIONS; i++) { for (int j = 0; j < numEventsPerPartition; j++) { kafkaProducer.send( - new ProducerRecord( + new ProducerRecord<>( topic, i, null, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index 083ef9e53551..bc1b896d4295 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -26,23 +26,29 @@ import io.druid.indexing.overlord.DataSourceMetadata; import java.io.IOException; +import javax.annotation.Nullable; public class CheckPointDataSourceMetadataAction implements TaskAction { private final String supervisorId; - private final int taskGroupId; + @Nullable + private final Integer taskGroupId; + @Deprecated + private final String baseSequenceName; private final DataSourceMetadata previousCheckPoint; private final DataSourceMetadata currentCheckPoint; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, - @JsonProperty("taskGroupId") Integer taskGroupId, + @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility, + @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint ) { this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); - this.taskGroupId = Preconditions.checkNotNull(taskGroupId, "taskGroupId"); + this.taskGroupId = taskGroupId; + this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName"); this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); } @@ -53,8 +59,16 @@ public String getSupervisorId() return supervisorId; } + @Deprecated + @JsonProperty("sequenceName") + public String getBaseSequenceName() + { + return baseSequenceName; + } + + @Nullable @JsonProperty - public int getTaskGroupId() + public Integer getTaskGroupId() { return taskGroupId; } @@ -87,6 +101,7 @@ public Boolean perform( return toolbox.getSupervisorManager().checkPointDataSourceMetadata( supervisorId, taskGroupId, + baseSequenceName, previousCheckPoint, currentCheckPoint ); @@ -103,6 +118,7 @@ public String toString() { return "CheckPointDataSourceMetadataAction{" + "supervisorId='" + supervisorId + '\'' + + ", baseSequenceName='" + baseSequenceName + '\'' + ", taskGroupId='" + taskGroupId + '\'' + ", previousCheckPoint=" + previousCheckPoint + ", currentCheckPoint=" + currentCheckPoint + diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 31e54cfd521a..cf3fe27e9c08 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -159,7 +159,8 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc public boolean checkPointDataSourceMetadata( String supervisorId, - int taskGroupId, + @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousDataSourceMetadata, DataSourceMetadata currentDataSourceMetadata ) @@ -172,7 +173,7 @@ public boolean checkPointDataSourceMetadata( Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(taskGroupId, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata); return true; } catch (Exception e) { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 0ba0701e82d0..8f8105ea8abc 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -83,7 +83,8 @@ public void reset(DataSourceMetadata dataSourceMetadata) {} @Override public void checkpoint( - int taskGroupId, + @Nullable Integer taskGroupId, + String baseSequenceName, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint ) diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index 58e73ff3df71..3f90766e3cd5 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -21,6 +21,8 @@ import io.druid.indexing.overlord.DataSourceMetadata; +import javax.annotation.Nullable; + public interface Supervisor { void start(); @@ -44,8 +46,14 @@ public interface Supervisor * represented by {@param currentCheckpoint} DataSourceMetadata * * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing + * @param baseSequenceName baseSequenceName * @param previousCheckPoint DataSourceMetadata checkpointed in previous call * @param currentCheckPoint current DataSourceMetadata to be checkpointed */ - void checkpoint(int taskGroupId, DataSourceMetadata previousCheckPoint, DataSourceMetadata currentCheckPoint); + void checkpoint( + @Nullable Integer taskGroupId, + @Deprecated String baseSequenceName, + DataSourceMetadata previousCheckPoint, + DataSourceMetadata currentCheckPoint + ); }