Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,7 @@ public void reset(DataSourceMetadata dataSourceMetadata)
public void checkpoint(
@Nullable Integer taskGroupId,
String baseSequenceName,
DataSourceMetadata previousCheckPoint,
DataSourceMetadata currentCheckPoint
DataSourceMetadata checkpointMetadata
)
{
// do nothing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
Expand Down Expand Up @@ -83,6 +84,10 @@ public KafkaIndexTask(
this.configMapper = configMapper;
this.ioConfig = ioConfig;

Preconditions.checkArgument(
ioConfig.getStartSequenceNumbers().getExclusivePartitions().isEmpty(),
"All startSequenceNumbers must be inclusive"
);
}

long getPollRetryMs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,7 @@ public void testIncrementalHandOff() throws Exception
Objects.hash(
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, currentOffsets)
)
new KafkaDataSourceMetadata(startPartitions)
)
)
);
Expand Down Expand Up @@ -700,10 +697,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
Objects.hash(
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, currentOffsets)
)
new KafkaDataSourceMetadata(startPartitions)
)
)
);
Expand All @@ -714,8 +708,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
0,
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, currentOffsets, ImmutableSet.of())
),
new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, nextOffsets))
)
)
)
);
Expand Down Expand Up @@ -820,10 +813,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception
Objects.hash(
DATA_SCHEMA.getDataSource(),
0,
new KafkaDataSourceMetadata(startPartitions),
new KafkaDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, checkpoint.getPartitionSequenceNumberMap())
)
new KafkaDataSourceMetadata(startPartitions)
)
)
);
Expand Down Expand Up @@ -2653,17 +2643,15 @@ public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable Integer taskGroupId,
String baseSequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
@Nullable DataSourceMetadata previousDataSourceMetadata
)
{
log.info("Adding checkpoint hash to the set");
checkpointRequestsHash.add(
Objects.hash(
supervisorId,
taskGroupId,
previousDataSourceMetadata,
currentDataSourceMetadata
previousDataSourceMetadata
)
);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,11 +628,7 @@ public void testDatasourceMetadata() throws Exception
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(
topic,
ImmutableMap.of(0, 10L, 1, 20L, 2, 30L),
ImmutableSet.of(0, 1, 2)
)
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of())
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
Expand Down Expand Up @@ -669,11 +665,7 @@ public void testBadMetadataOffsets() throws Exception
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(
topic,
ImmutableMap.of(0, 10L, 1, 20L, 2, 30L),
ImmutableSet.of(0, 1, 2)
)
new SeekableStreamStartSequenceNumbers<>(topic, ImmutableMap.of(0, 10L, 1, 20L, 2, 30L), ImmutableSet.of())
)
).anyTimes();
replayAll();
Expand Down Expand Up @@ -2348,21 +2340,13 @@ public void testCheckpointForInactiveTaskGroup()
supervisor.start();
supervisor.runInternal();

final Map<Integer, Long> fakeCheckpoints = Collections.emptyMap();
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
topic,
checkpoints.get(0),
ImmutableSet.of()
)),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
topic,
fakeCheckpoints,
fakeCheckpoints.keySet()
))
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Expand Down Expand Up @@ -2441,16 +2425,9 @@ public void testCheckpointForUnknownTaskGroup()
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
topic,
Collections.emptyMap(),
ImmutableSet.of()
)),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
topic,
Collections.emptyMap(),
ImmutableSet.of()
))
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, Collections.emptyMap(), ImmutableSet.of())
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Expand Down Expand Up @@ -2552,18 +2529,11 @@ public void testCheckpointWithNullTaskGroupId()

supervisor.runInternal();

final TreeMap<Integer, Map<Integer, Long>> newCheckpoints = new TreeMap<>();
newCheckpoints.put(0, ImmutableMap.of(0, 10L));
supervisor.checkpoint(
null,
id1.getIOConfig().getBaseSequenceName(),
new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
topic,
checkpoints.get(0),
ImmutableSet.of()
)),
new KafkaDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, newCheckpoints.get(0), newCheckpoints.get(0).keySet())
new SeekableStreamStartSequenceNumbers<>(topic, checkpoints.get(0), ImmutableSet.of())
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,7 @@ public void testIncrementalHandOff() throws Exception
Objects.hash(
DATA_SCHEMA.getDataSource(),
0,
new KinesisDataSourceMetadata(startPartitions),
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, currentOffsets))
new KinesisDataSourceMetadata(startPartitions)
)
)
);
Expand Down Expand Up @@ -758,8 +757,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
Objects.hash(
DATA_SCHEMA.getDataSource(),
0,
new KinesisDataSourceMetadata(startPartitions),
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, currentOffsets))
new KinesisDataSourceMetadata(startPartitions)
)
)
);
Expand All @@ -769,9 +767,7 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception
DATA_SCHEMA.getDataSource(),
0,
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(STREAM, currentOffsets, currentOffsets.keySet())
),
new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(STREAM, nextOffsets))
new SeekableStreamStartSequenceNumbers<>(STREAM, currentOffsets, currentOffsets.keySet()))
)
)
);
Expand Down Expand Up @@ -2832,17 +2828,15 @@ public boolean checkPointDataSourceMetadata(
String supervisorId,
@Nullable Integer taskGroupId,
String baseSequenceName,
@Nullable DataSourceMetadata previousDataSourceMetadata,
@Nullable DataSourceMetadata currentDataSourceMetadata
@Nullable DataSourceMetadata checkpointMetadata
)
{
LOG.info("Adding checkpoint hash to the set");
checkpointRequestsHash.add(
Objects.hash(
supervisorId,
taskGroupId,
previousDataSourceMetadata,
currentDataSourceMetadata
checkpointMetadata
)
);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2907,19 +2907,13 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException
supervisor.start();
supervisor.runInternal();

final Map<String, String> fakeCheckpoints = Collections.emptyMap();
supervisor.moveTaskGroupToPendingCompletion(0);
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), checkpoints.get(0).keySet())
),
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
stream,
fakeCheckpoints,
ImmutableSet.of()
))
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Expand Down Expand Up @@ -3044,16 +3038,9 @@ public void testCheckpointForUnknownTaskGroup()
supervisor.checkpoint(
0,
id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
stream,
Collections.emptyMap(),
ImmutableSet.of()
)),
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
stream,
Collections.emptyMap(),
ImmutableSet.of()
))
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(stream, Collections.emptyMap(), ImmutableSet.of())
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Expand Down Expand Up @@ -3164,21 +3151,12 @@ public void testCheckpointWithNullTaskGroupId() throws InterruptedException

supervisor.runInternal();

final TreeMap<Integer, Map<String, String>> newCheckpoints = new TreeMap<>();
newCheckpoints.put(0, ImmutableMap.of(shardId1, "10"));
supervisor.checkpoint(
null,
id1.getIOConfig().getBaseSequenceName(),
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
stream,
checkpoints.get(0),
ImmutableSet.of()
)),
new KinesisDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>(
stream,
newCheckpoints.get(0),
ImmutableSet.of()
))
new KinesisDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(stream, checkpoints.get(0), ImmutableSet.of())
)
);

while (supervisor.getNoticesQueueSize() > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;

import javax.annotation.Nullable;

Expand All @@ -34,22 +35,28 @@ public class CheckPointDataSourceMetadataAction implements TaskAction<Boolean>
private final Integer taskGroupId;
@Deprecated
private final String baseSequenceName;
private final DataSourceMetadata previousCheckPoint;
private final DataSourceMetadata currentCheckPoint;
private final SeekableStreamDataSourceMetadata checkpointMetadata;

public CheckPointDataSourceMetadataAction(
@JsonProperty("supervisorId") String supervisorId,
@JsonProperty("taskGroupId") @Nullable Integer taskGroupId, // nullable for backward compatibility,
@JsonProperty("sequenceName") @Deprecated String baseSequenceName, // old version would use this
@JsonProperty("previousCheckPoint") DataSourceMetadata previousCheckPoint,
@JsonProperty("currentCheckPoint") DataSourceMetadata currentCheckPoint
@JsonProperty("previousCheckPoint") @Nullable @Deprecated SeekableStreamDataSourceMetadata previousCheckPoint,
@JsonProperty("checkpointMetadata") @Nullable SeekableStreamDataSourceMetadata checkpointMetadata
)
{
this.supervisorId = Preconditions.checkNotNull(supervisorId, "supervisorId");
this.taskGroupId = taskGroupId;
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "sequenceName");
this.previousCheckPoint = Preconditions.checkNotNull(previousCheckPoint, "previousCheckPoint");
this.currentCheckPoint = Preconditions.checkNotNull(currentCheckPoint, "currentCheckPoint");
this.checkpointMetadata = checkpointMetadata == null ? previousCheckPoint : checkpointMetadata;

Preconditions.checkNotNull(this.checkpointMetadata, "checkpointMetadata");
// checkpointMetadata must be SeekableStreamStartSequenceNumbers because it's the start sequence numbers of the
// sequence currently being checkpointed
Preconditions.checkArgument(
this.checkpointMetadata.getSeekableStreamSequenceNumbers() instanceof SeekableStreamStartSequenceNumbers,
"checkpointMetadata must be SeekableStreamStartSequenceNumbers"
);
}

@JsonProperty
Expand All @@ -72,16 +79,18 @@ public Integer getTaskGroupId()
return taskGroupId;
}

// For backwards compatibility
@Deprecated
@JsonProperty
public DataSourceMetadata getPreviousCheckPoint()
public SeekableStreamDataSourceMetadata getPreviousCheckPoint()
{
return previousCheckPoint;
return checkpointMetadata;
}

@JsonProperty
public DataSourceMetadata getCurrentCheckPoint()
public SeekableStreamDataSourceMetadata getCheckpointMetadata()
{
return currentCheckPoint;
return checkpointMetadata;
}

@Override
Expand All @@ -99,8 +108,7 @@ public Boolean perform(Task task, TaskActionToolbox toolbox)
supervisorId,
taskGroupId,
baseSequenceName,
previousCheckPoint,
currentCheckPoint
checkpointMetadata
);
}

Expand All @@ -117,8 +125,7 @@ public String toString()
"supervisorId='" + supervisorId + '\'' +
", baseSequenceName='" + baseSequenceName + '\'' +
", taskGroupId='" + taskGroupId + '\'' +
", previousCheckPoint=" + previousCheckPoint +
", currentCheckPoint=" + currentCheckPoint +
", checkpointMetadata=" + checkpointMetadata +
'}';
}
}
Loading