Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ offsets in Kafka (depending on the value of `useEarliestOffset`). After clearing
offsets, the supervisor kills and recreates any active tasks, so that tasks begin reading
from valid offsets.

The `POST /druid/indexer/v1/supervisor/<supervisorId>/reset?timestamp=<timestamp in millisencond>` operation uses
the offsets align to this timestamp, causing the supervisor to start reading offsets from
that offsets in Kafka. After clearing stored offsets, the supervisor kills and recreates any active tasks,
so that tasks begin reading from valid offsets.

Use care when using this operation! Resetting the supervisor may cause Kafka messages
to be skipped or read twice, resulting in missing or duplicate data.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ public void reset(DataSourceMetadata dataSourceMetadata)
}
}

@Override
public void resetToTime(long timestamp)
{
throw new UnsupportedOperationException("resetToTime() is not supported in MaterializedViewSupervisor");
}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.metadata.PasswordProvider;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand Down Expand Up @@ -178,6 +179,30 @@ public Long getPosition(StreamPartition<Integer> partition)
)));
}

@Override
public Map<Integer, Long> getPositionFromTime(long offsetTime)
{
return wrapExceptions(
() -> {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
for (TopicPartition partition : consumer.assignment()) {
timestampsToSearch.put(partition, offsetTime);
}
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamps = consumer.offsetsForTimes(timestampsToSearch);
return offsetAndTimestamps
.entrySet()
.stream()
.filter(e -> e.getValue() != null)
.collect(
Collectors.toMap(
e -> e.getKey().partition(),
e -> e.getValue().offset()
)
);
}
);
}

@Override
public Set<Integer> getPartitionIds(String stream)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,13 @@ public String getPosition(StreamPartition<String> partition)
throw new UnsupportedOperationException("getPosition() is not supported in Kinesis");
}

@Nullable
@Override
public Map<String, String> getPositionFromTime(long offsetTime)
{
throw new UnsupportedOperationException("getPositionFromTime() is not supported in Kinesis");
}

@Nonnull
@Override
public List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,25 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc
return true;
}

public boolean resetSupervisorToTime(String id, long timestamp)
{
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(id, "id");

Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);

if (supervisor == null) {
return false;
}

supervisor.lhs.resetToTime(timestamp);
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.reset();
}
return true;
}

public boolean checkPointDataSourceMetadata(
String supervisorId,
int taskGroupId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,11 +454,14 @@ public Response specGetHistory(
@Path("/{id}/reset")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response reset(@PathParam("id") final String id)
public Response reset(@PathParam("id") final String id, @QueryParam("timestamp") final Long timestamp)
{
return asLeaderWithSupervisorManager(
manager -> {
if (manager.resetSupervisor(id, null)) {
boolean success = timestamp != null
? manager.resetSupervisorToTime(id, timestamp)
: manager.resetSupervisor(id, null);
if (success) {
return Response.ok(ImmutableMap.of("id", id)).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.Closeable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -123,6 +124,15 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> partition,
*/
SequenceOffsetType getPosition(StreamPartition<PartitionIdType> partition);

/**
* returns the sequence number of all partitions at the specified timestamp
*
* @param offsetTime target timestamp in millisecond
*
* @return sequence number of all partitions representing this timestamp
*/
Map<PartitionIdType, SequenceOffsetType> getPositionFromTime(long offsetTime);

/**
* returns the set of partitions under the given stream
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,167 @@ public String getType()
}
}

private class OverrideNotice implements Notice
{
final DataSourceMetadata dataSourceMetadata;
private static final String TYPE = "override_notice";

OverrideNotice(DataSourceMetadata dataSourceMetadata)
{
this.dataSourceMetadata = dataSourceMetadata;
}

@Override
public void handle()
{
if (!checkSourceMetadataMatch(dataSourceMetadata)) {
throw new IAE(
"Datasource metadata instance does not match required, found instance of [%s]",
dataSourceMetadata.getClass()
);
}
@SuppressWarnings("unchecked")
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> resetMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) dataSourceMetadata;

if (resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream())) {
final boolean metadataUpdateSuccess = indexerMetadataStorageCoordinator.overrideDataSourceMetadata(dataSource, resetMetadata);

if (metadataUpdateSuccess) {
resetMetadata.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.keySet()
.forEach(partition -> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(
ImmutableSet.of(partition),
"DataSourceMetadata is updated while override"
);
activelyReadingTaskGroups.remove(groupId);
// killTaskGroupForPartitions() cleans up partitionGroups.
// Add the removed groups back.
partitionGroups.computeIfAbsent(groupId, k -> new HashSet<>());
partitionOffsets.put(partition, getNotSetMarker());
});
} else {
throw new ISE("Unable to override metadata");
}
} else {
log.warn(
"Override metadata stream [%s] and supervisor's stream name [%s] do not match",
resetMetadata.getSeekableStreamSequenceNumbers().getStream(),
ioConfig.getStream()
);
}
}

@VisibleForTesting
public void resetInternal(DataSourceMetadata dataSourceMetadata)
{
if (!checkSourceMetadataMatch(dataSourceMetadata)) {
throw new IAE(
"Datasource metadata instance does not match required, found instance of [%s]",
dataSourceMetadata.getClass()
);
}
log.info("Reset dataSource[%s] with metadata[%s]", dataSource, dataSourceMetadata);
// Reset only the partitions in dataSourceMetadata if it has not been reset yet
@SuppressWarnings("unchecked")
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> resetMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) dataSourceMetadata;

if (resetMetadata.getSeekableStreamSequenceNumbers().getStream().equals(ioConfig.getStream())) {
// metadata can be null
final DataSourceMetadata metadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource);
if (metadata != null && !checkSourceMetadataMatch(metadata)) {
throw new IAE(
"Datasource metadata instance does not match required, found instance of [%s]",
metadata.getClass()
);
}

@SuppressWarnings("unchecked")
final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> currentMetadata =
(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType>) metadata;

// defend against consecutive reset requests from replicas
// as well as the case where the metadata store do not have an entry for the reset partitions
boolean doReset = false;
for (Entry<PartitionIdType, SequenceOffsetType> resetPartitionOffset : resetMetadata
.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.entrySet()) {
final SequenceOffsetType partitionOffsetInMetadataStore = currentMetadata == null
? null
: currentMetadata
.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.get(resetPartitionOffset.getKey());
final TaskGroup partitionTaskGroup = activelyReadingTaskGroups.get(
getTaskGroupIdForPartition(resetPartitionOffset.getKey())
);
final boolean isSameOffset = partitionTaskGroup != null
&& partitionTaskGroup.startingSequences.get(resetPartitionOffset.getKey())
.equals(resetPartitionOffset.getValue());
if (partitionOffsetInMetadataStore != null || isSameOffset) {
doReset = true;
break;
}
}

if (!doReset) {
log.info("Ignoring duplicate reset request [%s]", dataSourceMetadata);
return;
}

boolean metadataUpdateSuccess;
if (currentMetadata == null) {
metadataUpdateSuccess = true;
} else {
final DataSourceMetadata newMetadata = currentMetadata.minus(resetMetadata);
try {
metadataUpdateSuccess = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, newMetadata);
}
catch (IOException e) {
log.error("Resetting DataSourceMetadata failed [%s]", e.getMessage());
throw new RuntimeException(e);
}
}
if (metadataUpdateSuccess) {
resetMetadata.getSeekableStreamSequenceNumbers()
.getPartitionSequenceNumberMap()
.keySet()
.forEach(partition -> {
final int groupId = getTaskGroupIdForPartition(partition);
killTaskGroupForPartitions(
ImmutableSet.of(partition),
"DataSourceMetadata is updated while reset"
);
activelyReadingTaskGroups.remove(groupId);
// killTaskGroupForPartitions() cleans up partitionGroups.
// Add the removed groups back.
partitionGroups.computeIfAbsent(groupId, k -> new HashSet<>());
partitionOffsets.put(partition, getNotSetMarker());
});
} else {
throw new ISE("Unable to reset metadata");
}
} else {
log.warn(
"Reset metadata stream [%s] and supervisor's stream name [%s] do not match",
resetMetadata.getSeekableStreamSequenceNumbers().getStream(),
ioConfig.getStream()
);
}
}

@Override
public String getType()
{
return TYPE;
}
}

protected class CheckpointNotice implements Notice
{
private final int taskGroupId;
Expand Down Expand Up @@ -1006,6 +1167,15 @@ public void reset(DataSourceMetadata dataSourceMetadata)
addNotice(new ResetNotice(dataSourceMetadata));
}

@Override
public void resetToTime(long offsetTime)
{
log.info("Override %s's offset to time %s, posting OverrideNotice", dataSource, offsetTime);
Map<PartitionIdType, SequenceOffsetType> offsets = recordSupplier.getPositionFromTime(offsetTime);
log.info("The %s offset to reset is %s", dataSource, offsets);
notices.add(new OverrideNotice(createDataSourceMetaDataForReset(ioConfig.getStream(), offsets)));
}

public ReentrantLock getRecordSupplierLock()
{
return recordSupplierLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,12 @@ public Long getPosition(StreamPartition<Integer> partition)
return null;
}

@Override
public Map<Integer, Long> getPositionFromTime(long offsetTime)
{
return null;
}

@Override
public Set<Integer> getPartitionIds(String stream)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1085,12 +1085,12 @@ public void testReset()
)).andReturn(false);
replayAll();

Response response = supervisorResource.reset("my-id");
Response response = supervisorResource.reset("my-id", null);

Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());

response = supervisorResource.reset("my-id-2");
response = supervisorResource.reset("my-id-2", null);

Assert.assertEquals(404, response.getStatus());
Assert.assertEquals("my-id", id1.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ public Integer getPosition(StreamPartition<Integer> partition)
throw new UnsupportedOperationException();
}

@Override
public Map<Integer, Integer> getPositionFromTime(long offsetTime)
{
throw new UnsupportedOperationException();
}

private long getMinRowSize()
{
return TIMESTAMP_STRING.length() + (NUM_COLS - 1) * STR_LEN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public boolean deleteDataSourceMetadata(String dataSource)
throw new UnsupportedOperationException();
}

@Override
public boolean overrideDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata)
{
return false;
}

@Override
public boolean resetDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,16 @@ SegmentPublishResult announceHistoricalSegments(
*/
boolean deleteDataSourceMetadata(String dataSource);

/**
* Override dataSourceMetadata entry for 'dataSource' to the one supplied.
*
* @param dataSource identifier
* @param dataSourceMetadata value to override
*
* @return true if the entry was overrided, false otherwise
*/
boolean overrideDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata);

/**
* Resets dataSourceMetadata entry for 'dataSource' to the one supplied.
*
Expand Down
Loading