Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexer;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -31,6 +32,8 @@ public class TaskIdUtils
{
private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*");

private static final Joiner UNDERSCORE_JOINER = Joiner.on("_");

public static void validateId(String thingToValidate, String stringToValidate)
{
Preconditions.checkArgument(
Expand Down Expand Up @@ -60,4 +63,9 @@ public static String getRandomId()
}
return suffix.toString();
}

public static String getRandomIdWithPrefix(String prefix)
{
return UNDERSCORE_JOINER.join(prefix, TaskIdUtils.getRandomId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import org.apache.druid.indexer.TaskIdUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
Expand All @@ -46,6 +45,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
Expand Down Expand Up @@ -219,7 +219,7 @@ protected List<SeekableStreamIndexTask<Integer, Long>> createIndexTasks(

List<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<>();
for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId());
String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName);
taskList.add(new KafkaIndexTask(
taskId,
new TaskResource(baseSequenceName, 1),
Expand Down Expand Up @@ -334,16 +334,38 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
}

@Override
protected void updateLatestSequenceFromStream(
RecordSupplier<Integer, Long> recordSupplier,
Set<StreamPartition<Integer>> partitions
)
protected void updatePartitionLagFromStream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method compute the lag instead of latestSequenceFromStream? This will modify the current behavior since there could be some difference between computed lag and actual lag as highestCurrentOffsets will be computed periodically. However, I think it would be fine since 1) the behavior of this method is consistent across Kafka and Kinesis and 2) the lag metric doesn't have to be very real time.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to keep latestSequenceFromStream around since it's also included in the supervisor report payload, but I guess we could precompute the record lag here as well instead of lazily from the current highest offsets and latestSequenceFromStream whenever it is used in a report or emitted as lag metric and it probably wouldn't change the behavior that much.

Though, since generating a report also would be using the highest current offsets and including them in the payload, it might be strange if the record lag didn't match what you could manually compute from the reports current and highest offsets. Because of this report dissonance, I haven't changed it to precompute it yet, but I did leave a comment about what is going on and why we are only fetching the latest offsets instead of doing anything about lag.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

{
latestSequenceFromStream = partitions.stream()
.collect(Collectors.toMap(
StreamPartition::getPartitionId,
recordSupplier::getPosition
));
getRecordSupplierLock().lock();
try {
Set<Integer> partitionIds;
try {
partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream());
}
catch (Exception e) {
log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream());
throw new StreamException(e);
}

Set<StreamPartition<Integer>> partitions = partitionIds
.stream()
.map(e -> new StreamPartition<>(getIoConfig().getStream(), e))
.collect(Collectors.toSet());

recordSupplier.seekToLatest(partitions);

// this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is
// because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest
// task offsets from the latest offsets from the stream when it is needed
latestSequenceFromStream =
partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, recordSupplier::getPosition));
}
catch (InterruptedException e) {
throw new StreamException(e);
}
finally {
getRecordSupplierLock().unlock();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
Expand Down Expand Up @@ -97,8 +98,12 @@ protected KinesisRecordSupplier newTaskRecordSupplier()
KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig);
int fetchThreads = tuningConfig.getFetchThreads() != null
? tuningConfig.getFetchThreads()
: Math.max(1, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size());
: Runtime.getRuntime().availableProcessors() * 2;

Preconditions.checkArgument(
fetchThreads > 0,
"Must have at least one background fetch thread for the record supplier"
);
return new KinesisRecordSupplier(
KinesisRecordSupplier.getAmazonKinesisClient(
ioConfig.getEndpoint(),
Expand All @@ -114,7 +119,8 @@ protected KinesisRecordSupplier newTaskRecordSupplier()
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getFetchSequenceNumberTimeout(),
tuningConfig.getMaxRecordsPerPoll()
tuningConfig.getMaxRecordsPerPoll(),
false
);
}

Expand Down
Loading