From 189d6ac5e9123addc34dbd5a24509030faae9011 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 21 Mar 2019 15:45:49 -0700 Subject: [PATCH 1/3] Fix unrealistic test arguments in KafkaSupervisorTest --- .../indexing/kafka/supervisor/KafkaSupervisorTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index b30b75ee7886..67930ad05690 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -583,7 +583,7 @@ public void testDatasourceMetadata() throws Exception expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of(0, 1, 2)) + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()) ) ).anyTimes(); expect(taskQueue.add(capture(captured))).andReturn(true); @@ -620,7 +620,7 @@ public void testBadMetadataOffsets() throws Exception expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn( new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of(0, 1, 2)) + new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of()) ) ).anyTimes(); replayAll(); @@ -2515,7 +2515,7 @@ public void testCheckpointWithNullTaskGroupId() ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())), new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>(topic, newCheckpoints.get(0), newCheckpoints.get(0).keySet()) + new SeekableStreamEndSequenceNumbers<>(topic, newCheckpoints.get(0)) ) ); From ddd10ee3697fa50190146780c2742c97c5d3d038 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 21 Mar 2019 16:53:34 -0700 Subject: [PATCH 2/3] remove currentCheckpoint from checkpoint action --- .../MaterializedViewSupervisor.java | 3 +- .../druid/indexing/kafka/KafkaIndexTask.java | 5 +++ .../indexing/kafka/KafkaIndexTaskTest.java | 24 +++-------- .../kafka/supervisor/KafkaSupervisorTest.java | 11 +---- .../kinesis/KinesisIndexTaskTest.java | 15 +++---- .../supervisor/KinesisSupervisorTest.java | 12 +----- .../CheckPointDataSourceMetadataAction.java | 37 ++++++++++------- .../supervisor/SupervisorManager.java | 5 +-- .../SeekableStreamIndexTaskRunner.java | 10 ++--- .../supervisor/SeekableStreamSupervisor.java | 41 ++++++++----------- .../supervisor/NoopSupervisorSpec.java | 3 +- .../overlord/supervisor/Supervisor.java | 8 ++-- 12 files changed, 70 insertions(+), 104 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 105afdf8f23f..622c6dc0713e 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -255,8 +255,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) public void checkpoint( @Nullable Integer taskGroupId, String baseSequenceName, - DataSourceMetadata previousCheckPoint, - DataSourceMetadata currentCheckPoint + DataSourceMetadata checkpointMetadata ) { // do nothing diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index df56cced6ecb..d7e742ddd41a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; @@ -82,6 +83,10 @@ public KafkaIndexTask( this.configMapper = configMapper; this.ioConfig = ioConfig; + Preconditions.checkArgument( + ioConfig.getStartSequenceNumbers().getExclusivePartitions().isEmpty(), + "All startSequenceNumbers must be inclusive" + ); } long getPollRetryMs() diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index ebef3fc4596a..b8061db95f64 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -558,10 +558,7 @@ public void testIncrementalHandOff() throws Exception Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(topic, currentOffsets) - ) + new KafkaDataSourceMetadata(startPartitions) ) ) ); @@ -693,10 +690,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(topic, currentOffsets) - ) + new KafkaDataSourceMetadata(startPartitions) ) ) ); @@ -707,8 +701,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception 0, new KafkaDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(topic, currentOffsets, ImmutableSet.of()) - ), - new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, nextOffsets)) + ) ) ) ); @@ -814,10 +807,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KafkaDataSourceMetadata(startPartitions), - new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(topic, checkpoint.getPartitionSequenceNumberMap()) - ) + new KafkaDataSourceMetadata(startPartitions) ) ) ); @@ -2673,8 +2663,7 @@ public boolean checkPointDataSourceMetadata( String supervisorId, @Nullable Integer taskGroupId, String baseSequenceName, - @Nullable DataSourceMetadata previousDataSourceMetadata, - @Nullable DataSourceMetadata currentDataSourceMetadata + @Nullable DataSourceMetadata previousDataSourceMetadata ) { log.info("Adding checkpoint hash to the set"); @@ -2682,8 +2671,7 @@ public boolean checkPointDataSourceMetadata( Objects.hash( supervisorId, taskGroupId, - previousDataSourceMetadata, - currentDataSourceMetadata + previousDataSourceMetadata ) ); return true; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 67930ad05690..657147b7b658 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -2327,8 +2327,7 @@ public void testCheckpointForInactiveTaskGroup() supervisor.checkpoint( 0, ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, fakeCheckpoints, fakeCheckpoints.keySet())) + new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -2407,7 +2406,6 @@ public void testCheckpointForUnknownTaskGroup() supervisor.checkpoint( 0, ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())), new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())) ); @@ -2508,15 +2506,10 @@ public void testCheckpointWithNullTaskGroupId() supervisor.runInternal(); - final TreeMap> newCheckpoints = new TreeMap<>(); - newCheckpoints.put(0, ImmutableMap.of(0, 10L)); supervisor.checkpoint( null, ((KafkaIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())), - new KafkaDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>(topic, newCheckpoints.get(0)) - ) + new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())) ); while (supervisor.getNoticesQueueSize() > 0) { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 435bb2abf8e4..66cf63c3c2c1 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -587,8 +587,7 @@ public void testIncrementalHandOff() throws Exception Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KinesisDataSourceMetadata(startPartitions), - new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(stream, currentOffsets)) + new KinesisDataSourceMetadata(startPartitions) ) ) ); @@ -725,8 +724,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception Objects.hash( DATA_SCHEMA.getDataSource(), 0, - new KinesisDataSourceMetadata(startPartitions), - new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(stream, currentOffsets)) + new KinesisDataSourceMetadata(startPartitions) ) ) ); @@ -737,8 +735,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception 0, new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(stream, currentOffsets, ImmutableSet.of()) - ), - new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(stream, nextOffsets)) + ) ) ) ); @@ -2701,8 +2698,7 @@ public boolean checkPointDataSourceMetadata( String supervisorId, @Nullable Integer taskGroupId, String baseSequenceName, - @Nullable DataSourceMetadata previousDataSourceMetadata, - @Nullable DataSourceMetadata currentDataSourceMetadata + @Nullable DataSourceMetadata checkpointMetadata ) { log.info("Adding checkpoint hash to the set"); @@ -2710,8 +2706,7 @@ public boolean checkPointDataSourceMetadata( Objects.hash( supervisorId, taskGroupId, - previousDataSourceMetadata, - currentDataSourceMetadata + checkpointMetadata ) ); return true; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 73c1d4692faa..9bdd20251a76 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -2861,8 +2861,7 @@ public void testCheckpointForInactiveTaskGroup() ((KinesisIndexTask) id1).getIOConfig().getBaseSequenceName(), new KinesisDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), checkpoints.get(0).keySet()) - ), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, fakeCheckpoints, ImmutableSet.of())) + ) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -2984,7 +2983,6 @@ public void testCheckpointForUnknownTaskGroup() supervisor.checkpoint( 0, ((KinesisIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of())), new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of())) ); @@ -3095,13 +3093,10 @@ public void testCheckpointWithNullTaskGroupId() supervisor.runInternal(); - final TreeMap> newCheckpoints = new TreeMap<>(); - newCheckpoints.put(0, ImmutableMap.of(shardId1, "10")); supervisor.checkpoint( null, ((KinesisIndexTask) id1).getIOConfig().getBaseSequenceName(), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), ImmutableSet.of())), - new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, newCheckpoints.get(0), ImmutableSet.of())) + new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), ImmutableSet.of())) ); while (supervisor.getNoticesQueueSize() > 0) { @@ -3685,8 +3680,6 @@ public String getDataSource() private class TestableKinesisSupervisor extends KinesisSupervisor { - private final KinesisSupervisorSpec spec; - public TestableKinesisSupervisor( TaskStorage taskStorage, TaskMaster taskMaster, @@ -3707,7 +3700,6 @@ public TestableKinesisSupervisor( rowIngestionMetersFactory, null ); - this.spec = spec; } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index f0ebd49be90d..46c90941636e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -23,7 +23,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import javax.annotation.Nullable; @@ -34,22 +35,28 @@ public class CheckPointDataSourceMetadataAction implements TaskAction private final Integer taskGroupId; @Deprecated private final String baseSequenceName; - private final DataSourceMetadata previousCheckPoint; - private final DataSourceMetadata currentCheckPoint; + private final SeekableStreamDataSourceMetadata checkpointMetadata; public CheckPointDataSourceMetadataAction( @JsonProperty("supervisorId") String supervisorId, @JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility, @JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this - @JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint, - @JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint + @JsonProperty("previousCheckPoint") @Nullable @Deprecated SeekableStreamDataSourceMetadata previousCheckPoint, + @JsonProperty("checkpointMetadata") @Nullable SeekableStreamDataSourceMetadata checkpointMetadata ) { this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId"); this.taskGroupId = taskGroupId; this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName"); - this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint"); - this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint"); + this.checkpointMetadata = checkpointMetadata == null ? previousCheckPoint : checkpointMetadata; + + Preconditions.checkNotNull(this.checkpointMetadata, "checkpointMetadata"); + // checkpointMetadata must be SeekableStreamStartSequenceNumbers because it's the start sequence numbers of the + // sequence currently being checkpointed + Preconditions.checkArgument( + this.checkpointMetadata.getSeekableStreamSequenceNumbers() instanceof SeekableStreamStartSequenceNumbers, + "checkpointMetadata must be SeekableStreamStartSequenceNumbers" + ); } @JsonProperty @@ -72,16 +79,18 @@ public Integer getTaskGroupId() return taskGroupId; } + // For backwards compatibility + @Deprecated @JsonProperty - public DataSourceMetadata getPreviousCheckPoint() + public SeekableStreamDataSourceMetadata getPreviousCheckPoint() { - return previousCheckPoint; + return checkpointMetadata; } @JsonProperty - public DataSourceMetadata getCurrentCheckPoint() + public SeekableStreamDataSourceMetadata getCheckpointMetadata() { - return currentCheckPoint; + return checkpointMetadata; } @Override @@ -99,8 +108,7 @@ public Boolean perform(Task task, TaskActionToolbox toolbox) supervisorId, taskGroupId, baseSequenceName, - previousCheckPoint, - currentCheckPoint + checkpointMetadata ); } @@ -117,8 +125,7 @@ public String toString() "supervisorId='" + supervisorId + '\'' + ", baseSequenceName='" + baseSequenceName + '\'' + ", taskGroupId='" + taskGroupId + '\'' + - ", previousCheckPoint=" + previousCheckPoint + - ", currentCheckPoint=" + currentCheckPoint + + ", checkpointMetadata=" + checkpointMetadata + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 3168b898de61..4187c316d1a3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -202,8 +202,7 @@ public boolean checkPointDataSourceMetadata( String supervisorId, @Nullable Integer taskGroupId, String baseSequenceName, - DataSourceMetadata previousDataSourceMetadata, - DataSourceMetadata currentDataSourceMetadata + DataSourceMetadata previousDataSourceMetadata ) { try { @@ -214,7 +213,7 @@ public boolean checkPointDataSourceMetadata( Preconditions.checkNotNull(supervisor, "supervisor could not be found"); - supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata, currentDataSourceMetadata); + supervisor.lhs.checkpoint(taskGroupId, baseSequenceName, previousDataSourceMetadata); return true; } catch (Exception e) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 3907d5451177..0978331bbb7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -30,6 +30,7 @@ import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; @@ -645,17 +646,12 @@ public void onFailure(Throwable t) task.getDataSource(), ioConfig.getTaskGroupId(), task.getIOConfig().getBaseSequenceName(), + null, createDataSourceMetadata( new SeekableStreamStartSequenceNumbers<>( stream, sequenceToCheckpoint.getStartOffsets(), - ioConfig.getStartSequenceNumbers().getExclusivePartitions() - ) - ), - createDataSourceMetadata( - new SeekableStreamEndSequenceNumbers<>( - stream, - currOffsets + ImmutableSet.of() ) ) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ab7f3bcbd7cd..eb968ddb7a4e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -61,6 +61,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; @@ -311,19 +312,16 @@ protected class CheckpointNotice implements Notice @Deprecated private final String baseSequenceName; private final SeekableStreamDataSourceMetadata previousCheckpoint; - private final SeekableStreamDataSourceMetadata currentCheckpoint; - public CheckpointNotice( + CheckpointNotice( @Nullable Integer nullableTaskGroupId, @Deprecated String baseSequenceName, - SeekableStreamDataSourceMetadata previousCheckpoint, - SeekableStreamDataSourceMetadata currentCheckpoint + SeekableStreamDataSourceMetadata previousCheckpoint ) { this.baseSequenceName = baseSequenceName; this.nullableTaskGroupId = nullableTaskGroupId; this.previousCheckpoint = previousCheckpoint; - this.currentCheckpoint = currentCheckpoint; } @Override @@ -2633,31 +2631,28 @@ public SeekableStreamSupervisorIOConfig getIoConfig() public void checkpoint( @Nullable Integer taskGroupId, @Deprecated String baseSequenceName, - DataSourceMetadata previousCheckPoint, - DataSourceMetadata currentCheckPoint + DataSourceMetadata checkpointMetadata ) { - Preconditions.checkNotNull(previousCheckPoint, "previousCheckpoint"); - Preconditions.checkNotNull(currentCheckPoint, "current checkpoint cannot be null"); + Preconditions.checkNotNull(checkpointMetadata, "checkpointMetadata"); + + //noinspection unchecked + final SeekableStreamDataSourceMetadata seekableMetadata = + (SeekableStreamDataSourceMetadata) checkpointMetadata; + Preconditions.checkArgument( - spec.getIoConfig() - .getStream() - .equals(((SeekableStreamDataSourceMetadata) currentCheckPoint).getSeekableStreamSequenceNumbers() - .getStream()), + spec.getIoConfig().getStream().equals(seekableMetadata.getSeekableStreamSequenceNumbers().getStream()), "Supervisor stream [%s] and stream in checkpoint [%s] does not match", spec.getIoConfig().getStream(), - ((SeekableStreamDataSourceMetadata) currentCheckPoint).getSeekableStreamSequenceNumbers().getStream() + seekableMetadata.getSeekableStreamSequenceNumbers().getStream() ); - - log.info("Checkpointing [%s] for taskGroup [%s]", currentCheckPoint, taskGroupId); - addNotice( - new CheckpointNotice( - taskGroupId, - baseSequenceName, - (SeekableStreamDataSourceMetadata) previousCheckPoint, - (SeekableStreamDataSourceMetadata) currentCheckPoint - ) + Preconditions.checkArgument( + seekableMetadata.getSeekableStreamSequenceNumbers() instanceof SeekableStreamStartSequenceNumbers, + "checkpointMetadata must be SeekableStreamStartSequenceNumbers" ); + + log.info("Checkpointing [%s] for taskGroup [%s]", checkpointMetadata, taskGroupId); + addNotice(new CheckpointNotice(taskGroupId, baseSequenceName, seekableMetadata)); } /** diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 6935a62f5457..f9ab889bec06 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -116,8 +116,7 @@ public void reset(DataSourceMetadata dataSourceMetadata) {} public void checkpoint( @Nullable Integer taskGroupId, String baseSequenceName, - DataSourceMetadata previousCheckPoint, - DataSourceMetadata currentCheckPoint + DataSourceMetadata checkpointMetadata ) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 03c2a43d9ebb..921dc9831dfe 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -49,18 +49,16 @@ default Map> getStats() /** * The definition of checkpoint is not very strict as currently it does not affect data or control path. * On this call Supervisor can potentially checkpoint data processed so far to some durable storage - * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data + * for example - Kafka/Kinesis Supervisor uses this to merge and handoff segments containing at least the data * represented by {@param currentCheckpoint} DataSourceMetadata * * @param taskGroupId unique Identifier to figure out for which sequence to do checkpointing * @param baseSequenceName baseSequenceName - * @param previousCheckPoint DataSourceMetadata checkpointed in previous call - * @param currentCheckPoint current DataSourceMetadata to be checkpointed + * @param checkpointMetadata metadata for the sequence to currently checkpoint */ void checkpoint( @Nullable Integer taskGroupId, @Deprecated String baseSequenceName, - DataSourceMetadata previousCheckPoint, - DataSourceMetadata currentCheckPoint + DataSourceMetadata checkpointMetadata ); } From 65c9fc639d774dcdd78563c0e9d40689f1358bec Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 15 Aug 2019 14:39:00 -0700 Subject: [PATCH 3/3] rename variable --- .../supervisor/SeekableStreamSupervisor.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index d91f48c57c58..654f0ed60436 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -340,17 +340,17 @@ protected class CheckpointNotice implements Notice private final Integer nullableTaskGroupId; @Deprecated private final String baseSequenceName; - private final SeekableStreamDataSourceMetadata previousCheckpoint; + private final SeekableStreamDataSourceMetadata checkpointMetadata; CheckpointNotice( @Nullable Integer nullableTaskGroupId, @Deprecated String baseSequenceName, - SeekableStreamDataSourceMetadata previousCheckpoint + SeekableStreamDataSourceMetadata checkpointMetadata ) { this.baseSequenceName = baseSequenceName; this.nullableTaskGroupId = nullableTaskGroupId; - this.previousCheckpoint = previousCheckpoint; + this.checkpointMetadata = checkpointMetadata; } @Override @@ -402,7 +402,7 @@ public void handle() throws ExecutionException, InterruptedException Map checkpoint = checkpoints.get(sequenceId); // We have already verified the stream of the current checkpoint is same with that in ioConfig. // See checkpoint(). - if (checkpoint.equals(previousCheckpoint.getSeekableStreamSequenceNumbers() + if (checkpoint.equals(checkpointMetadata.getSeekableStreamSequenceNumbers() .getPartitionSequenceNumberMap() )) { break; @@ -410,7 +410,7 @@ public void handle() throws ExecutionException, InterruptedException index--; } if (index == 0) { - throw new ISE("No such previous checkpoint [%s] found", previousCheckpoint); + throw new ISE("No such previous checkpoint [%s] found", checkpointMetadata); } else if (index < checkpoints.size()) { // if the found checkpoint is not the latest one then already checkpointed by a replica Preconditions.checkState(index == checkpoints.size() - 1, "checkpoint consistency failure");