Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.LockRequestForNewSegment;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.Segments;
Expand All @@ -37,6 +36,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
Expand Down Expand Up @@ -226,15 +226,16 @@ public SegmentIdWithShardSpec perform(
throw new IAE("Task dataSource must match action dataSource, [%s] != [%s].", task.getDataSource(), dataSource);
}

final IndexerMetadataStorageCoordinator msc = toolbox.getIndexerMetadataStorageCoordinator();
final IndexerSQLMetadataStorageCoordinator msc = (IndexerSQLMetadataStorageCoordinator) toolbox.getIndexerMetadataStorageCoordinator();

// 1) if something overlaps our timestamp, use that
// 2) otherwise try preferredSegmentGranularity & going progressively smaller

final Interval rowInterval = queryGranularity.bucket(timestamp).withChronology(ISOChronology.getInstanceUTC());

final Set<DataSegment> usedSegmentsForRow =
new HashSet<>(msc.retrieveUsedSegmentsForInterval(dataSource, rowInterval, Segments.ONLY_VISIBLE));
new HashSet<>(msc.retrieveUsedSegmentsForIntervalsFromCache(dataSource, rowInterval, Segments.ONLY_VISIBLE));


final SegmentIdWithShardSpec identifier;
if (usedSegmentsForRow.isEmpty()) {
Expand All @@ -251,7 +252,8 @@ public SegmentIdWithShardSpec perform(
// and if it's different, repeat.

Set<DataSegment> newUsedSegmentsForRow =
new HashSet<>(msc.retrieveUsedSegmentsForInterval(dataSource, rowInterval, Segments.ONLY_VISIBLE));
new HashSet<>(msc.retrieveUsedSegmentsForIntervalsFromCache(dataSource, rowInterval, Segments.ONLY_VISIBLE));

if (!newUsedSegmentsForRow.equals(usedSegmentsForRow)) {
if (attempt < MAX_ATTEMPTS) {
final long shortRandomSleep = 50 + (long) (ThreadLocalRandom.current().nextDouble() * 450);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -380,8 +381,9 @@ private boolean processBatch(AllocateRequestBatch requestBatch)

private Set<DataSegment> retrieveUsedSegments(AllocateRequestKey key)
{
IndexerSQLMetadataStorageCoordinator coordinator = (IndexerSQLMetadataStorageCoordinator) metadataStorage;
return new HashSet<>(
metadataStorage.retrieveUsedSegmentsForInterval(
coordinator.retrieveUsedSegmentsForIntervalsFromCache(
key.dataSource,
key.preferredAllocationInterval,
Segments.ONLY_VISIBLE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.query.QueryContexts;
Expand Down Expand Up @@ -168,6 +169,8 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
final Set<String> failedToReacquireLockTaskGroups = new HashSet<>();
// Bookkeeping for a log message at the end
int taskLockCount = 0;
// To initialize metadata storage coordinator pending segment cache
final Map<String, Set<Interval>> datasourceToActiveTaskLocks = new HashMap<>();
for (final Pair<Task, TaskLock> taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) {
final Task task = Preconditions.checkNotNull(taskAndLock.lhs, "task");
final TaskLock savedTaskLock = Preconditions.checkNotNull(taskAndLock.rhs, "savedTaskLock");
Expand All @@ -192,13 +195,17 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)

if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
taskLockCount++;
datasourceToActiveTaskLocks.computeIfAbsent(taskLock.getDataSource(), ds -> new HashSet<>())
.add(taskLock.getInterval());
log.info(
"Reacquired lock[%s] for task: %s",
taskLock,
task.getId()
);
} else {
taskLockCount++;
datasourceToActiveTaskLocks.computeIfAbsent(taskLock.getDataSource(), ds -> new HashSet<>())
.add(taskLock.getInterval());
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
savedTaskLockWithPriority.getInterval(),
Expand Down Expand Up @@ -245,6 +252,10 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
+ "as they failed to reacquire at least one lock.", failedToReacquireLockTaskGroups);
}

if (metadataStorageCoordinator instanceof IndexerSQLMetadataStorageCoordinator) {
((IndexerSQLMetadataStorageCoordinator) metadataStorageCoordinator).initializeCache(datasourceToActiveTaskLocks);
}

return new TaskLockboxSyncResult(tasksToFail);
}
finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
Expand All @@ -53,6 +55,8 @@
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -1168,6 +1172,85 @@ public void testSegmentIdMustNotBeReused()
lockbox.unlock(task1, Intervals.ETERNITY);
}

//@Test TODO uncomment
public void testBenchmark() throws Exception
{
final Task task = NoopTask.create();
taskActionTestKit.getTaskLockbox().add(task);

final Interval startInterval = Intervals.of("2024-01-01/2024-01-02");
final long dayToMillis = 1000L * 60L * 60L * 24L;
final long numDays = 365;
List<Interval> intervals = new ArrayList<>();
for (long day = 0; day < numDays; day++) {
intervals.add(
new Interval(
startInterval.getStartMillis() + dayToMillis * day,
startInterval.getEndMillis() + dayToMillis * day,
DateTimeZone.UTC
)
);
}

final IndexerSQLMetadataStorageCoordinator coordinator =
(IndexerSQLMetadataStorageCoordinator) taskActionTestKit.getMetadataStorageCoordinator();
final Set<DataSegment> segments = new HashSet<>();
final List<PendingSegmentRecord> pendingSegments = new ArrayList<>();
final int numUsedSegmentsPerInterval = 2000;
final int numPendingSegmentsPerInterval = 1000;
int version = 0;
for (Interval interval : intervals) {
for (int i = 0; i < numUsedSegmentsPerInterval; i++) {
segments.add(getSegmentForIdentifier(
new SegmentIdWithShardSpec(
DATA_SOURCE,
interval,
"version" + version,
new NumberedShardSpec(i, numUsedSegmentsPerInterval)
)
));
}
coordinator.commitSegments(segments, null);
segments.clear();
for (int i = numUsedSegmentsPerInterval; i < numUsedSegmentsPerInterval + numPendingSegmentsPerInterval; i++) {
pendingSegments.add(new PendingSegmentRecord(
new SegmentIdWithShardSpec(
DATA_SOURCE,
interval,
"version" + version,
new NumberedShardSpec(i, numUsedSegmentsPerInterval)
),
"sequence" + i,
"random_" + ThreadLocalRandom.current().nextInt(),
null,
null
));
}
coordinator.insertPendingSegments(DATA_SOURCE, pendingSegments, true);
pendingSegments.clear();
version = (version + 1) % 10;
System.out.println(interval);
}

ExecutorService allocatorService = Execs.multiThreaded(4, "allocator-%d");

final List<Callable<SegmentIdWithShardSpec>> allocateTasks = new ArrayList<>();

long startTime = System.nanoTime();
for (int j = 0; j < numDays; j++) {
for (int i = 0; i < 20; i++) {
final String sequenceId = "sequence" + (numPendingSegmentsPerInterval + numUsedSegmentsPerInterval + i);
final int k = j;
allocateTasks.add(() -> allocateWithoutLineageCheck(task, intervals.get(k).getStart(), Granularities.NONE, Granularities.DAY, sequenceId, TaskLockType.APPEND));
}
}
for (Future<SegmentIdWithShardSpec> future : allocatorService.invokeAll(allocateTasks)) {
future.get();
}
long totalMillis = (System.nanoTime() - startTime) / 1_000_000;
System.out.println("Total allocation time for 7300 requests: " + totalMillis + "ms.");
}

private SegmentIdWithShardSpec allocate(
final Task task,
final DateTime timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.Iterators;

import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;

Expand Down Expand Up @@ -60,6 +61,13 @@ public void addSegments(Iterator<DataSegment> segments)
);
}

public void removeSegments(Collection<DataSegment> segments)
{
for (DataSegment segment : segments) {
remove(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
}
}

public boolean isOvershadowed(DataSegment segment)
{
return isOvershadowed(segment.getInterval(), segment.getVersion(), segment);
Expand Down
6 changes: 6 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,12 @@
<artifactId>commons-text</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.13.11</version>
<scope>compile</scope>
</dependency>
</dependencies>

<build>
Expand Down
Loading