From 2c3957e585c70db356e368f6e2da0bb947a20959 Mon Sep 17 00:00:00 2001 From: Amatya Date: Sat, 26 Oct 2024 10:49:45 +0530 Subject: [PATCH 1/3] Faster batch segment allocation by reducing metadata IO --- .../actions/SegmentAllocationQueue.java | 20 ++- .../druid/indexing/overlord/TaskLockbox.java | 36 ++-- .../overlord/config/TaskLockConfig.java | 9 + .../actions/SegmentAllocateActionTest.java | 19 +- .../actions/SegmentAllocationQueueTest.java | 149 ++++++++++++++++ .../common/actions/TaskActionTestKit.java | 13 ++ ...TestIndexerMetadataStorageCoordinator.java | 18 +- .../IndexerMetadataStorageCoordinator.java | 18 +- .../IndexerSQLMetadataStorageCoordinator.java | 105 ++++++++++- .../metadata/SqlSegmentsMetadataQuery.java | 66 +++++++ ...exerSQLMetadataStorageCoordinatorTest.java | 163 +++++++++++++++++- 11 files changed, 575 insertions(+), 41 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 98ab50cff788..c4e9b0638f3c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -25,7 +25,6 @@ import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.java.util.common.ISE; @@ -41,6 +40,7 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.Partitions; import org.joda.time.Interval; import java.util.ArrayList; @@ -87,6 +87,8 @@ public class SegmentAllocationQueue private final ConcurrentHashMap keyToBatch = new ConcurrentHashMap<>(); private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); + private final boolean skipSegmentPayloadFetchForAllocation; + @Inject public SegmentAllocationQueue( TaskLockbox taskLockbox, @@ -100,6 +102,7 @@ public SegmentAllocationQueue( this.taskLockbox = taskLockbox; this.metadataStorage = metadataStorage; this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationWaitTime(); + this.skipSegmentPayloadFetchForAllocation = taskLockConfig.isSegmentAllocationReduceMetadataIO(); this.executor = taskLockConfig.isBatchSegmentAllocation() ? executorFactory.create(1, "SegmentAllocQueue-%s") : null; @@ -380,13 +383,11 @@ private boolean processBatch(AllocateRequestBatch requestBatch) private Set retrieveUsedSegments(AllocateRequestKey key) { - return new HashSet<>( - metadataStorage.retrieveUsedSegmentsForInterval( - key.dataSource, - key.preferredAllocationInterval, - Segments.ONLY_VISIBLE - ) - ); + return metadataStorage.getSegmentTimelineForAllocation( + key.dataSource, + key.preferredAllocationInterval, + (key.lockGranularity == LockGranularity.TIME_CHUNK) && skipSegmentPayloadFetchForAllocation + ).findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); } private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set usedSegments) @@ -493,7 +494,8 @@ private int allocateSegmentsForInterval( requestKey.dataSource, tryInterval, requestKey.skipSegmentLineageCheck, - requestKey.lockGranularity + requestKey.lockGranularity, + skipSegmentPayloadFetchForAllocation ); int successfulRequests = 0; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index bebb52157d6f..f38666859abd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -460,12 +460,14 @@ public LockResult tryLock(final Task task, final LockRequest request) * successfully and others failed. In that case, only the failed ones should be * retried. * - * @param requests List of allocation requests - * @param dataSource Datasource for which segment is to be allocated. - * @param interval Interval for which segment is to be allocated. - * @param skipSegmentLineageCheck Whether lineage check is to be skipped - * (this is true for streaming ingestion) - * @param lockGranularity Granularity of task lock + * @param requests List of allocation requests + * @param dataSource Datasource for which segment is to be allocated. + * @param interval Interval for which segment is to be allocated. + * @param skipSegmentLineageCheck Whether lineage check is to be skipped + * (this is true for streaming ingestion) + * @param lockGranularity Granularity of task lock + * @param skipSegmentPayloadFetchForAllocation Whether to skip fetching payloads for all used + * segments and rely on their ids instead. * @return List of allocation results in the same order as the requests. */ public List allocateSegments( @@ -473,7 +475,8 @@ public List allocateSegments( String dataSource, Interval interval, boolean skipSegmentLineageCheck, - LockGranularity lockGranularity + LockGranularity lockGranularity, + boolean skipSegmentPayloadFetchForAllocation ) { log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval); @@ -487,9 +490,15 @@ public List allocateSegments( if (isTimeChunkLock) { // For time-chunk locking, segment must be allocated only after acquiring the lock holderList.getPending().forEach(holder -> acquireTaskLock(holder, true)); - allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending()); + allocateSegmentIds( + dataSource, + interval, + skipSegmentLineageCheck, + holderList.getPending(), + skipSegmentPayloadFetchForAllocation + ); } else { - allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending()); + allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending(), false); holderList.getPending().forEach(holder -> acquireTaskLock(holder, false)); } holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded); @@ -702,12 +711,12 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest request) * for the given requests. Updates the holder with the allocated segment if * the allocation succeeds, otherwise marks it as failed. */ - @VisibleForTesting - void allocateSegmentIds( + private void allocateSegmentIds( String dataSource, Interval interval, boolean skipSegmentLineageCheck, - Collection holders + Collection holders, + boolean skipSegmentPayloadFetchForAllocation ) { if (holders.isEmpty()) { @@ -724,7 +733,8 @@ void allocateSegmentIds( dataSource, interval, skipSegmentLineageCheck, - createRequests + createRequests, + skipSegmentPayloadFetchForAllocation ); for (SegmentAllocationHolder holder : holders) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java index 2634c4328fec..cf353276dc61 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java @@ -36,6 +36,9 @@ public class TaskLockConfig @JsonProperty private long batchAllocationWaitTime = 0L; + @JsonProperty + private boolean segmentAllocationReduceMetadataIO = false; + public boolean isForceTimeChunkLock() { return forceTimeChunkLock; @@ -50,4 +53,10 @@ public long getBatchAllocationWaitTime() { return batchAllocationWaitTime; } + + public boolean isSegmentAllocationReduceMetadataIO() + { + return segmentAllocationReduceMetadataIO; + } + } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index fdb7fcd5595b..3c0b08758f76 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -93,21 +93,28 @@ public class SegmentAllocateActionTest private SegmentAllocationQueue allocationQueue; - @Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}") + @Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}, skipSegmentPayloadFetchForAllocation = {2}") public static Iterable constructorFeeder() { return ImmutableList.of( - new Object[]{LockGranularity.SEGMENT, true}, - new Object[]{LockGranularity.SEGMENT, false}, - new Object[]{LockGranularity.TIME_CHUNK, true}, - new Object[]{LockGranularity.TIME_CHUNK, false} + new Object[]{LockGranularity.SEGMENT, true, true}, + new Object[]{LockGranularity.SEGMENT, true, false}, + new Object[]{LockGranularity.SEGMENT, false, false}, + new Object[]{LockGranularity.TIME_CHUNK, true, true}, + new Object[]{LockGranularity.TIME_CHUNK, true, false}, + new Object[]{LockGranularity.TIME_CHUNK, false, false} ); } - public SegmentAllocateActionTest(LockGranularity lockGranularity, boolean useBatch) + public SegmentAllocateActionTest( + LockGranularity lockGranularity, + boolean useBatch, + boolean skipSegmentPayloadFetchForAllocation + ) { this.lockGranularity = lockGranularity; this.useBatch = useBatch; + this.taskActionTestKit.setSkipSegmentPayloadFetchForAllocation(skipSegmentPayloadFetchForAllocation); } @Before diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java index 6a5c84082f2c..6d39023628c2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java @@ -25,20 +25,34 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.segment.TestDataSource; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTimeZone; +import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -46,6 +60,8 @@ public class SegmentAllocationQueueTest { + private static final Logger log = new Logger(SegmentAllocationQueueTest.class); + @Rule public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); @@ -73,6 +89,12 @@ public long getBatchAllocationWaitTime() { return 0; } + + @Override + public boolean isSegmentAllocationReduceMetadataIO() + { + return true; + } }; allocationQueue = new SegmentAllocationQueue( @@ -99,6 +121,133 @@ public void tearDown() emitter.flush(); } + @Test + @Ignore + public void testLongQueue() throws Exception + { + allocationQueue.start(); + allocationQueue.becomeLeader(); + final Task task = NoopTask.create(); + taskActionTestKit.getTaskLockbox().add(task); + + final Interval startInterval = Intervals.of("2024-01-01/PT1H"); + final long hourToMillis = 1000L * 60L * 60L; + final long numHours = 48; + List intervals = new ArrayList<>(); + for (long hour = 0; hour < numHours; hour++) { + intervals.add( + new Interval( + startInterval.getStartMillis() + hourToMillis * hour, + startInterval.getStartMillis() + hourToMillis * hour + hourToMillis, + DateTimeZone.UTC + ) + ); + } + + final IndexerSQLMetadataStorageCoordinator coordinator = + (IndexerSQLMetadataStorageCoordinator) taskActionTestKit.getMetadataStorageCoordinator(); + + final List dimensions = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + dimensions.add("dimension_" + i); + } + final Set segments = new HashSet<>(); + final int numUsedSegmentsPerInterval = 2000; + int version = 0; + for (Interval interval : intervals) { + for (int i = 0; i < numUsedSegmentsPerInterval; i++) { + segments.add( + DataSegment.builder() + .dataSource(task.getDataSource()) + .interval(interval) + .version("version" + version) + .shardSpec(new NumberedShardSpec(i, numUsedSegmentsPerInterval)) + .dimensions(dimensions) + .size(100) + .build() + ); + } + coordinator.commitSegments(segments, null); + segments.clear(); + version = (version + 1) % 10; + } + + + final int numAllocations = 10; + final int replicas = 2; + Map>> sequenceNameAndPrevIdToFutures = new HashMap<>(); + for (int i = 0; i < numAllocations; i++) { + for (int j = 0; j < numHours; j++) { + final int id = numUsedSegmentsPerInterval + i / replicas; + final String sequenceId = j + "-sequence" + id; + final String prevSequenceId = j + "-sequence" + (id - 1); + sequenceNameAndPrevIdToFutures.computeIfAbsent( + sequenceId + "|" + prevSequenceId, + k -> new ArrayList<>() + ).add( + allocationQueue.add( + new SegmentAllocateRequest( + task, + new SegmentAllocateAction( + task.getDataSource(), + intervals.get(j).getStart(), + Granularities.NONE, + Granularities.HOUR, + sequenceId, + prevSequenceId, + false, + NumberedPartialShardSpec.instance(), + LockGranularity.TIME_CHUNK, + TaskLockType.APPEND + ), + 10 + ) + ) + ); + } + } + executor.finishAllPendingTasks(); + + final Set allocatedIds = new HashSet<>(); + for (List> sameIds : sequenceNameAndPrevIdToFutures.values()) { + if (sameIds.isEmpty()) { + return; + } + final SegmentIdWithShardSpec id = sameIds.get(0).get(); + Assert.assertNotNull(id); + for (int i = 1; i < sameIds.size(); i++) { + Assert.assertEquals(id, sameIds.get(i).get()); + } + Assert.assertFalse(allocatedIds.contains(id)); + allocatedIds.add(id); + } + Assert.assertEquals(numHours * numAllocations, allocatedIds.size() * replicas); + printStats("task/action/batch/runTime"); + printStats("task/action/batch/queueTime"); + } + + private void printStats(String metricName) + { + int count = 0; + double max = 0; + double min = Double.POSITIVE_INFINITY; + double sum = 0; + for (ServiceMetricEvent event : emitter.getMetricEvents().get(metricName)) { + count++; + max = Math.max(max, event.getValue().doubleValue()); + min = Math.min(min, event.getValue().doubleValue()); + sum += event.getValue().doubleValue(); + } + double avg = 0; + if (count > 0) { + avg = sum / count; + } + log.info( + "Metric[%s]: count[%d], min[%f], max[%f], avg[%f], sum[%f]", + metricName, count, min, max, avg, sum + ); + } + @Test public void testBatchWithMultipleTimestamps() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index fcbf37c956da..69c8b6079055 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -58,6 +58,8 @@ public class TaskActionTestKit extends ExternalResource private SegmentSchemaManager segmentSchemaManager; private SegmentSchemaCache segmentSchemaCache; + private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isSegmentAllocationReduceMetadataIO(); + public TaskLockbox getTaskLockbox() { return taskLockbox; @@ -78,6 +80,11 @@ public TaskActionToolbox getTaskActionToolbox() return taskActionToolbox; } + public void setSkipSegmentPayloadFetchForAllocation(boolean skipSegmentPayloadFetchForAllocation) + { + this.skipSegmentPayloadFetchForAllocation = skipSegmentPayloadFetchForAllocation; + } + @Override public void before() { @@ -126,6 +133,12 @@ public long getBatchAllocationWaitTime() { return 10L; } + + @Override + public boolean isSegmentAllocationReduceMetadataIO() + { + return skipSegmentPayloadFetchForAllocation; + } }; taskActionToolbox = new TaskActionToolbox( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index d2055d6e0c99..54e323581c47 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -35,6 +35,7 @@ import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -168,7 +169,8 @@ public Map allocatePendingSegments String dataSource, Interval interval, boolean skipSegmentLineageCheck, - List requests + List requests, + boolean isTimeChunk ) { return Collections.emptyMap(); @@ -332,6 +334,20 @@ public Map> retrieveUpgradedToSegmentIds( return Collections.emptyMap(); } + @Override + public SegmentTimeline getSegmentTimelineForAllocation( + String dataSource, + Interval interval, + boolean skipSegmentPayloadFetchForAllocation + ) + { + return SegmentTimeline.forSegments(retrieveUsedSegmentsForIntervals( + dataSource, + Collections.singletonList(interval), + Segments.INCLUDING_OVERSHADOWED + )); + } + public Set getPublished() { return ImmutableSet.copyOf(published); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 83b4ac7e474c..867b1f618c97 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -25,6 +25,7 @@ import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -179,6 +180,8 @@ List retrieveUnusedSegmentsForInterval( * Should be set to false if replica tasks would index events in same order * @param requests Requests for which to allocate segments. All * the requests must share the same partition space. + * @param skipSegmentPayloadFetch If true, try to use the segment ids instead of fetching every segment + * payload from the metadata store * @return Map from request to allocated segment id. The map does not contain * entries for failed requests. */ @@ -186,7 +189,20 @@ Map allocatePendingSegments( String dataSource, Interval interval, boolean skipSegmentLineageCheck, - List requests + List requests, + boolean skipSegmentPayloadFetch + ); + + /** + * Return a segment timeline of all used segments including overshadowed ones for a given datasource and interval + * if skipSegmentPayloadFetchForAllocation is set to true, do not fetch all the segment payloads for allocation + * Instead fetch all the ids and numCorePartitions using exactly one segment per version per interval + * return a dummy DataSegment for each id that holds only the SegmentId and a NumberedShardSpec with numCorePartitions + */ + SegmentTimeline getSegmentTimelineForAllocation( + String dataSource, + Interval interval, + boolean skipSegmentPayloadFetchForAllocation ); /** diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index a512f7935740..0652d2905490 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -719,7 +719,8 @@ public Map allocatePendingSegments String dataSource, Interval allocateInterval, boolean skipSegmentLineageCheck, - List requests + List requests, + boolean skipSegmentPayloadFetch ) { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -727,7 +728,14 @@ public Map allocatePendingSegments final Interval interval = allocateInterval.withChronology(ISOChronology.getInstanceUTC()); return connector.retryWithHandle( - handle -> allocatePendingSegments(handle, dataSource, interval, skipSegmentLineageCheck, requests) + handle -> allocatePendingSegments( + handle, + dataSource, + interval, + skipSegmentLineageCheck, + requests, + skipSegmentPayloadFetch + ) ); } @@ -1003,18 +1011,39 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( return newIdentifier; } + @Override + public SegmentTimeline getSegmentTimelineForAllocation( + String dataSource, + Interval interval, + boolean skipSegmentPayloadFetchForAllocation + ) + { + return connector.retryWithHandle( + handle -> { + if (skipSegmentPayloadFetchForAllocation) { + return SegmentTimeline.forSegments(retrieveUsedSegmentsForAllocation(handle, dataSource, interval)); + } else { + return getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)); + } + } + ); + } + private Map allocatePendingSegments( final Handle handle, final String dataSource, final Interval interval, final boolean skipSegmentLineageCheck, - final List requests + final List requests, + final boolean skipSegmentPayloadFetch ) throws IOException { // Get the time chunk and associated data segments for the given interval, if any - final List> existingChunks = - getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) - .lookup(interval); + final List> existingChunks = getSegmentTimelineForAllocation( + dataSource, + interval, + skipSegmentPayloadFetch + ).lookup(interval); if (existingChunks.size() > 1) { log.warn( "Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.", @@ -2900,6 +2929,70 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set retrieveUsedSegmentsForAllocation( + final Handle handle, + final String dataSource, + final Interval interval + ) + { + final Set overlappingSegmentIds = SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables, jsonMapper) + .retrieveUsedSegmentIds( + dataSource, + interval + ); + // Map from version -> interval -> segmentId with the smallest partitionNum + Map> versionIntervalToSmallestSegmentId = new HashMap<>(); + for (SegmentId segmentId : overlappingSegmentIds) { + final Map map + = versionIntervalToSmallestSegmentId.computeIfAbsent(segmentId.getVersion(), v -> new HashMap<>()); + final SegmentId value = map.get(segmentId.getInterval()); + if (value == null || value.getPartitionNum() > segmentId.getPartitionNum()) { + map.put(interval, segmentId); + } + } + + // Retrieve the segments for the ids stored in the map to get the numCorePartitions + final Set segmentIdsToRetrieve = new HashSet<>(); + for (Map itvlMap : versionIntervalToSmallestSegmentId.values()) { + segmentIdsToRetrieve.addAll(itvlMap.values().stream().map(SegmentId::toString).collect(Collectors.toList())); + } + final Set dataSegments = retrieveSegmentsById(dataSource, segmentIdsToRetrieve); + final Set retrievedIds = new HashSet<>(); + final Map> versionIntervalToNumCorePartitions = new HashMap<>(); + for (DataSegment segment : dataSegments) { + versionIntervalToNumCorePartitions.computeIfAbsent(segment.getVersion(), v -> new HashMap<>()) + .put(segment.getInterval(), segment.getShardSpec().getNumCorePartitions()); + retrievedIds.add(segment.getId().toString()); + } + if (!retrievedIds.equals(segmentIdsToRetrieve)) { + throw DruidException.defensive( + "Cannot create DataSegments for segment allocations." + + "The used segments may have changed for dataSource[%s] and interval[%s].", + dataSource, interval + ); + } + + // Populate the required segment info + Set segmentsWithAllocationInfo = new HashSet<>(); + for (SegmentId id : overlappingSegmentIds) { + final int corePartitions = versionIntervalToNumCorePartitions.get(id.getVersion()).get(id.getInterval()); + segmentsWithAllocationInfo.add( + new DataSegment( + id, + null, + null, + null, + new NumberedShardSpec(id.getPartitionNum(), corePartitions), + null, + null, + 1 + ) + ); + } + return segmentsWithAllocationInfo; + } + @Override public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index fc1c84a70371..bd53026a5aee 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.UnmodifiableIterator; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -35,6 +36,7 @@ import org.apache.druid.server.http.DataSegmentPlus; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.skife.jdbi.v2.Handle; @@ -48,6 +50,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -242,6 +245,69 @@ public CloseableIterator retrieveUnusedSegmentsPlus( ); } + public Set retrieveUsedSegmentIds( + final String dataSource, + final Interval interval + ) + { + return retrieveSegmentIds(dataSource, Collections.singletonList(interval)); + } + + private Set retrieveSegmentIds( + final String dataSource, + final Collection intervals + ) + { + if (CollectionUtils.isNullOrEmpty(intervals)) { + return Collections.emptySet(); + } + + // Check if the intervals all support comparing as strings. If so, bake them into the SQL. + final boolean compareAsString = intervals.stream().allMatch(Intervals::canCompareEndpointsAsStrings); + + final StringBuilder sb = new StringBuilder(); + sb.append("SELECT id FROM %s WHERE used = :used AND dataSource = :dataSource"); + + if (compareAsString) { + sb.append( + getConditionForIntervalsAndMatchMode(intervals, IntervalMode.OVERLAPS, connector.getQuoteString()) + ); + } + + return connector.inReadOnlyTransaction( + (handle, status) -> { + final Query> sql = handle + .createQuery(StringUtils.format(sb.toString(), dbTables.getSegmentsTable())) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("used", true) + .bind("dataSource", dataSource); + + if (compareAsString) { + bindIntervalsToQuery(sql, intervals); + } + + final Set segmentIds = new HashSet<>(); + try (final ResultIterator iterator = sql.map((index, r, ctx) -> r.getString(1)).iterator()) { + while (iterator.hasNext()) { + final String id = iterator.next(); + final SegmentId segmentId = SegmentId.tryParse(dataSource, id); + if (segmentId == null) { + throw DruidException.defensive( + "Failed to parse SegmentId for id[%s] and dataSource[%s].", + id, dataSource + ); + } + for (Interval interval : intervals) { + if (IntervalMode.OVERLAPS.apply(interval, segmentId.getInterval())) { + segmentIds.add(segmentId); + } + } + } + } + return segmentIds; + }); + } + public List retrieveSegmentsById( String datasource, Set segmentIds diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 4b592e5f40da..0377faac5fbf 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -2669,6 +2669,76 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() } + + @Test + public void testAllocatePendingSegmentsSkipSegmentPayloadFetch() + { + final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); + final String dataSource = "ds"; + final Interval interval = Intervals.of("2017-01-01/2017-02-01"); + final String sequenceName = "seq"; + + final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request), + true + ).get(request); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); + + final SegmentCreateRequest request1 = + new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request1), + true + ).get(request1); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); + + final SegmentCreateRequest request2 = + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request2), + true + ).get(request2); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); + + final SegmentCreateRequest request3 = + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request3), + true + ).get(request3); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString()); + Assert.assertEquals(segmentId2, segmentId3); + + final SegmentCreateRequest request4 = + new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null); + final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request4), + true + ).get(request4); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString()); + } + @Test public void testAllocatePendingSegments() { @@ -2682,7 +2752,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request) + Collections.singletonList(request), + false ).get(request); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); @@ -2693,7 +2764,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request1) + Collections.singletonList(request1), + false ).get(request1); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); @@ -2704,7 +2776,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request2) + Collections.singletonList(request2), + false ).get(request2); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); @@ -2715,7 +2788,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request3) + Collections.singletonList(request3), + false ).get(request3); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString()); @@ -2727,7 +2801,8 @@ public void testAllocatePendingSegments() dataSource, interval, false, - Collections.singletonList(request4) + Collections.singletonList(request4), + false ).get(request4); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString()); @@ -3639,6 +3714,84 @@ public void testRetrieveUpgradedToSegmentIdsInBatches() Assert.assertEquals(expected, actual); } + @Test + public void testRetrieveUsedSegmentsForSegmentAllocation() + { + final String datasource = "DS"; + DataSegment firstSegment; + Set nextSegments; + final Map loadspec = ImmutableMap.of("loadSpec", "loadSpec"); + final List dimensions = ImmutableList.of("dim1", "dim2"); + final List metrics = ImmutableList.of("metric1", "metric2"); + final int numSegmentsPerInterval = 100; + + final Interval month = Intervals.of("2024-10-01/2024-11-01"); + + final Interval year = Intervals.of("2024/2025"); + + final Interval overlappingDay = Intervals.of("2024-10-01/2024-10-02"); + final Interval nonOverlappingDay = Intervals.of("2024-01-01/2024-01-02"); + + final List intervals = ImmutableList.of(month, year, overlappingDay, nonOverlappingDay); + final List versions = ImmutableList.of("v0", "v1", "v2", "v2"); + for (int i = 0; i < 4; i++) { + nextSegments = new HashSet<>(); + firstSegment = new DataSegment( + datasource, + intervals.get(i), + versions.get(i), + loadspec, + dimensions, + metrics, + new DimensionRangeShardSpec(dimensions, null, null, 0, 1), + 0, + 100 + ); + insertUsedSegments(Collections.singleton(firstSegment), Collections.emptyMap()); + for (int j = 1; j < numSegmentsPerInterval; j++) { + nextSegments.add( + new DataSegment( + datasource, + intervals.get(i), + versions.get(i), + loadspec, + dimensions, + metrics, + // The numCorePartitions is intentionally 0 + new NumberedShardSpec(j, 0), + 0, + 100 + ) + ); + } + insertUsedSegments(nextSegments, Collections.emptyMap()); + } + + final Set expected = new HashSet<>(); + for (int i = 0; i < 3; i++) { + for (int j = 0; j < numSegmentsPerInterval; j++) { + expected.add( + new SegmentIdWithShardSpec( + datasource, + intervals.get(i), + versions.get(i), + new NumberedShardSpec(j, 1) + ) + ); + } + } + + Assert.assertEquals(expected, + derbyConnector.retryWithHandle( + handle -> coordinator.retrieveUsedSegmentsForAllocation(handle, datasource, month) + .stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toSet()) + ) + ); + } + + private void insertUsedSegments(Set segments, Map upgradedFromSegmentIdMap) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); From 163480182c364114af98fd48ff00e6a134e60ff2 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 30 Oct 2024 05:06:16 +0530 Subject: [PATCH 2/3] Rough code for TaskLockPosse level cache --- .../common/actions/SegmentAllocateAction.java | 20 + .../actions/SegmentAllocationQueue.java | 2 +- .../SegmentTransactionalInsertAction.java | 22 +- .../druid/indexing/overlord/TaskLockbox.java | 374 +++++++++++++++++- .../actions/SegmentAllocateActionTest.java | 146 ++++++- .../actions/SegmentAllocationQueueTest.java | 142 ------- .../common/actions/TaskActionTestKit.java | 23 +- .../common/actions/TaskLocksTest.java | 4 +- .../common/task/IngestionTestBase.java | 3 +- .../OverlordCompactionSchedulerTest.java | 4 +- .../indexing/overlord/TaskLifecycleTest.java | 5 +- .../overlord/TaskLockBoxConcurrencyTest.java | 4 +- .../indexing/overlord/TaskLockConfigTest.java | 2 +- .../indexing/overlord/TaskLockboxTest.java | 33 +- .../indexing/overlord/TaskQueueScaleTest.java | 5 +- .../indexing/overlord/TaskQueueTest.java | 2 +- .../indexing/overlord/http/OverlordTest.java | 2 +- .../SeekableStreamIndexTaskTestBase.java | 3 +- .../IndexerSQLMetadataStorageCoordinator.java | 83 +++- 19 files changed, 675 insertions(+), 204 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index 902dad5dd879..60909cf97997 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.LockRequestForNewSegment; import org.apache.druid.indexing.overlord.LockResult; import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -219,6 +220,25 @@ public SegmentIdWithShardSpec perform( ); } int attempt = 0; + final TaskLockbox lockbox = toolbox.getTaskLockbox(); + if (lockbox.canAllocateSegmentWithReducedMetadataIO(getLockGranularity(), getTaskLockType())) { + LockResult result = lockbox.allocateSegmentWithReducedMetadataIO( + task, + taskLockType, + timestamp, + queryGranularity, + preferredSegmentGranularity, + sequenceName, + previousSegmentId, + skipSegmentLineageCheck, + partialShardSpec + ); + if (result.isOk()) { + return result.getNewSegmentId(); + } else { + return null; + } + } while (true) { attempt++; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index c4e9b0638f3c..4198a5c3bcdc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -70,7 +70,7 @@ public class SegmentAllocationQueue private static final Logger log = new Logger(SegmentAllocationQueue.class); private static final int MAX_QUEUE_SIZE = 2000; - private static final int MAX_BATCH_SIZE = 500; + private static final int MAX_BATCH_SIZE = 5; private final long maxWaitTimeMillis; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index e8dd472cf31d..a5f57cac097d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.java.util.common.ISE; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.SegmentUtils; @@ -213,18 +214,25 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) } } + final TaskLockbox lockbox = toolbox.getTaskLockbox(); try { - retVal = toolbox.getTaskLockbox().doInCriticalSection( + retVal = lockbox.doInCriticalSection( task, allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() .onValidLocks( - () -> toolbox.getIndexerMetadataStorageCoordinator().commitSegmentsAndMetadata( - segments, - startMetadata, - endMetadata, - segmentSchemaMapping - ) + () -> { + SegmentPublishResult result = + toolbox.getIndexerMetadataStorageCoordinator() + .commitSegmentsAndMetadata( + segments, + startMetadata, + endMetadata, + segmentSchemaMapping + ); + lockbox.cacheSegmentPublishResults(task, result.getSegments()); + return result; + } ) .onInvalidLocks( () -> SegmentPublishResult.fail( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index f38666859abd..9c149974a5c2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -41,19 +41,29 @@ import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.UOE; +import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentTimeline; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.partition.PartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; import javax.annotation.Nullable; import java.util.ArrayList; @@ -116,14 +126,18 @@ public class TaskLockbox @GuardedBy("giant") private final Map> activeAllocatorIdToTaskIds = new HashMap<>(); + private final boolean segmentAllocationReduceMetadataIO; + @Inject public TaskLockbox( TaskStorage taskStorage, - IndexerMetadataStorageCoordinator metadataStorageCoordinator + IndexerMetadataStorageCoordinator metadataStorageCoordinator, + TaskLockConfig taskLockConfig ) { this.taskStorage = taskStorage; this.metadataStorageCoordinator = metadataStorageCoordinator; + this.segmentAllocationReduceMetadataIO = taskLockConfig.isSegmentAllocationReduceMetadataIO(); } /** @@ -180,8 +194,8 @@ public int compare(Pair left, Pair right) // Create a new taskLock if it doesn't have a proper priority, // so that every taskLock in memory has the priority. final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null - ? savedTaskLock.withPriority(task.getPriority()) - : savedTaskLock; + ? savedTaskLock.withPriority(task.getPriority()) + : savedTaskLock; final TaskLockPosse taskLockPosse = verifyAndCreateOrFindLockPosse( task, @@ -334,7 +348,6 @@ protected TaskLockPosse verifyAndCreateOrFindLockPosse(Task task, TaskLock taskL * * @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a * {@link LockResult#revoked} flag. - * * @throws InterruptedException if the current thread is interrupted */ public LockResult lock(final Task task, final LockRequest request) throws InterruptedException @@ -360,7 +373,6 @@ public LockResult lock(final Task task, final LockRequest request) throws Interr * * @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a * {@link LockResult#revoked} flag. - * * @throws InterruptedException if the current thread is interrupted */ public LockResult lock(final Task task, final LockRequest request, long timeoutMs) throws InterruptedException @@ -388,7 +400,6 @@ public LockResult lock(final Task task, final LockRequest request, long timeoutM * * @return {@link LockResult} containing a new or an existing lock if succeeded. Otherwise, {@link LockResult} with a * {@link LockResult#revoked} flag. - * * @throws IllegalStateException if the task is not a valid active task */ public LockResult tryLock(final Task task, final LockRequest request) @@ -452,6 +463,232 @@ public LockResult tryLock(final Task task, final LockRequest request) } } + public void cacheSegmentPublishResults(Task task, Collection segments) + { + if (!segmentAllocationReduceMetadataIO) { + return; + } + Map intervalToMaxSegment = new HashMap<>(); + for (DataSegment segment : segments) { + final Interval interval = segment.getInterval(); + if (intervalToMaxSegment.get(interval) == null + || intervalToMaxSegment.get(interval).getId().getPartitionNum() < segment.getId().getPartitionNum()) { + intervalToMaxSegment.put(interval, segment); + } + } + for (Map.Entry entry : intervalToMaxSegment.entrySet()) { + final Interval interval = entry.getKey(); + for (TaskLockPosse posse : findLockPossesOverlapsInterval(task.getDataSource(), interval)) { + if (posse.containsTask(task)) { + if (posse.getTaskLock().getInterval().equals(interval) && posse.visibleSegmentState != null) { + posse.visibleSegmentState.updateMaxSegment(entry.getValue()); + } + } + } + } + } + + public boolean canAllocateSegmentWithReducedMetadataIO(final LockGranularity granularity, final TaskLockType lockType) + { + return segmentAllocationReduceMetadataIO + && granularity == LockGranularity.TIME_CHUNK + && (lockType == TaskLockType.EXCLUSIVE || lockType == TaskLockType.SHARED); + } + + public LockResult allocateSegmentWithReducedMetadataIO( + final Task task, + final TaskLockType lockType, + final DateTime timestamp, + final Granularity queryGranularity, + final Granularity preferredSegmentGranularity, + final String sequenceName, + final String previousSegmentId, + final boolean skipSegmentLineageCheck, + final PartialShardSpec partialShardSpec + ) + { + giant.lock(); + + try { + final Interval rowInterval = queryGranularity.bucket(timestamp).withChronology(ISOChronology.getInstanceUTC()); + + for (Granularity segmentGranularity : Granularity.granularitiesFinerThan(preferredSegmentGranularity)) { + final Interval segmentInterval = segmentGranularity.bucket(timestamp) + .withChronology(ISOChronology.getInstanceUTC()); + if (!segmentInterval.contains(rowInterval)) { + break; + } + final VisibleSegmentState visibleSegmentState = determineVisibleSegmentState(task, segmentInterval); + if (visibleSegmentState == null) { + continue; + } + + if (!visibleSegmentState.interval.contains(rowInterval)) { + continue; + } + + if (visibleSegmentState.committedVersion == null && !visibleSegmentState.interval.equals(segmentInterval)) { + continue; + } + + final TimeChunkLockRequest lockRequest + = new TimeChunkLockRequest(lockType, task, visibleSegmentState.interval, null); + + final TaskLockPosse lockPosse = createOrFindLockPosse(lockRequest, task, true); + + if (lockPosse.getTaskLock().isRevoked()) { + log.warn("Lock[%s] is revoked.", lockPosse.getTaskLock()); + return LockResult.revoked(lockPosse.getTaskLock()); + } + + if (!lockPosse.getTaskLock().getInterval().equals(visibleSegmentState.interval)) { + log.warn( + "Interval[%s] of existing segments for datasource[%s] does not match the lock interval[%s]", + visibleSegmentState.interval, task.getDataSource(), lockPosse.taskLock.getInterval() + ); + unlock(task, lockPosse.getTaskLock().getInterval()); + continue; + } + + if (lockPosse.visibleSegmentState == null) { + lockPosse.visibleSegmentState = visibleSegmentState; + } else if (!lockPosse.visibleSegmentState.equals(visibleSegmentState)) { + throw DruidException.defensive("VisibleSegmentState mismatch for lock[%s]", lockPosse.getTaskLock()); + } + + final String allocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId(); + SegmentIdWithShardSpec allocatedId = ((IndexerSQLMetadataStorageCoordinator) metadataStorageCoordinator) + .findOrInsertPendingSegmentRecord( + task.getDataSource(), + visibleSegmentState.interval, + visibleSegmentState.committedVersion, + visibleSegmentState.getVersion(lockPosse.taskLock.getVersion()), + visibleSegmentState.getMaxId(), + visibleSegmentState.numCorePartitions, + partialShardSpec, + sequenceName, + previousSegmentId, + skipSegmentLineageCheck, + allocatorId + ); + + if (allocatedId != null) { + visibleSegmentState.addPendingSegment( + new PendingSegmentRecord(allocatedId, sequenceName, previousSegmentId, null, allocatorId) + ); + } + log.info("Found or Allocated pendingSegment[%s].", allocatedId.asSegmentId()); + return LockResult.ok(lockPosse.getTaskLock(), allocatedId); + } + return LockResult.fail(); + } + finally { + giant.unlock(); + } + } + + private VisibleSegmentState determineVisibleSegmentState(Task task, Interval interval) + { + VisibleSegmentState state = null; + final String datasource = task.getDataSource(); + final List conflictPosses = findConflictPosses(datasource, interval); + for (TaskLockPosse lockPosse : conflictPosses) { + if (!lockPosse.getTaskLock().isRevoked()) { + if (state == null) { + state = lockPosse.visibleSegmentState; + } else if (lockPosse.visibleSegmentState != null && !state.equals(lockPosse.visibleSegmentState)) { + throw DruidException.defensive( + "VisibleSegmentState mismatch for datasource[%s] and interval[%s] with conflicting lock[%s].", + datasource, interval, lockPosse.getTaskLock() + ); + } + } + } + if (state != null) { + return state; + } + + final SegmentTimeline timeline = + metadataStorageCoordinator.getSegmentTimelineForAllocation(datasource, interval, true); + + final List> existingChunks = timeline.lookup(interval); + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s] as it already has [%,d] versions.", + datasource, interval, existingChunks.size() + ); + return null; + } + DataSegment maxSegment = null; + if (!existingChunks.isEmpty()) { + for (DataSegment segment : existingChunks.get(0).getObject().payloads()) { + if (maxSegment == null + || maxSegment.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { + maxSegment = segment; + } + } + } + + final Interval visibleInterval = maxSegment == null ? interval : maxSegment.getInterval(); + + final List overlappingPendingSegments = + metadataStorageCoordinator.getPendingSegments(datasource, visibleInterval); + + String pendingSegmentVersionToUse; + if (maxSegment != null) { + pendingSegmentVersionToUse = maxSegment.getVersion(); + } else { + pendingSegmentVersionToUse = ""; + for (PendingSegmentRecord pendingSegment : overlappingPendingSegments) { + if (pendingSegmentVersionToUse.compareTo(pendingSegment.getId().getVersion()) < 0) { + pendingSegmentVersionToUse = pendingSegment.getId().getVersion(); + } + } + } + + List pendingSegments = new ArrayList<>(); + for (PendingSegmentRecord pendingSegment : pendingSegments) { + if (pendingSegment.getId().getVersion().equals(pendingSegmentVersionToUse)) { + if (pendingSegment.getId().getInterval().equals(visibleInterval)) { + pendingSegments.add(pendingSegment); + } else { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s] as it has a conflicting pendingSegment[%s].", + datasource, interval, pendingSegment.getId().asSegmentId() + ); + return null; + } + } + } + + return new VisibleSegmentState(visibleInterval, maxSegment, pendingSegments); + } + + private List findConflictPosses(String datasource, Interval interval) + { + giant.lock(); + + try { + final List foundPosses = findLockPossesOverlapsInterval( + datasource, + interval + ); + for (TaskLockPosse posse : foundPosses) { + if (posse.getTaskLock().getGranularity() == LockGranularity.SEGMENT) { + throw DruidException.defensive( + "TimeChunk lock request for datasource[%s], interval[%s] has a conflicting segment lock.", + datasource, interval + ); + } + } + return foundPosses; + } + finally { + giant.unlock(); + } + } + /** * Attempts to allocate segments for the given requests. Each request contains * a {@link Task} and a {@link SegmentAllocateAction}. This method tries to @@ -795,6 +1032,7 @@ private void revokeLock(TaskLockPosse lockPosse) try { lockPosse.taskIds.forEach(taskId -> revokeLock(taskId, lockPosse.getTaskLock())); + lockPosse.visibleSegmentState = null; } finally { giant.unlock(); @@ -1194,6 +1432,12 @@ public void unlockAll(Task task) ? ((SegmentLock) taskLockPosse.taskLock).getPartitionId() : null ); + if (task instanceof PendingSegmentAllocatingTask) { + final String allocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId(); + if (taskLockPosse.visibleSegmentState != null && !activeAllocatorIdToTaskIds.containsKey(allocatorId)) { + taskLockPosse.visibleSegmentState.removePendingSegmentsForAllocator(allocatorId); + } + } } } finally { @@ -1580,6 +1824,123 @@ private boolean revokeAllIncompatibleActiveLocksIfPossible( return true; } + static class VisibleSegmentState + { + // Fixed as the segment interval cannot change once committed or allocated + private final Interval interval; + // Fixed as the numCorePartitions cannot be altered by appending jobs + private final int numCorePartitions; + // Can be null or set once after used segments have been committed for the first time for the interval + private String committedVersion = null; + // Can be null or set once after used segments have been committed for the first time for the interval + private String allocatedVersion = null; + // Can be altered when new segments are committed + private int maxCommittedId = -1; + // Max pending segment id that cannot be cleaned up automatically by the TaskLockbox i.e with null taskAllocatorId + private int maxNonReusableId = -1; + private final Map> allocatorToActiveIds = new HashMap<>(); + + VisibleSegmentState(Interval interval, @Nullable DataSegment maxSegment, List pendingSegments) + { + this.interval = interval; + if (maxSegment == null) { + numCorePartitions = 0; + } else { + numCorePartitions = maxSegment.getShardSpec().getNumCorePartitions(); + updateMaxSegment(maxSegment); + } + for (PendingSegmentRecord pendingSegment : pendingSegments) { + addPendingSegment(pendingSegment); + } + } + + int getMaxId() + { + int maxId = Math.max(maxCommittedId, maxNonReusableId); + for (Set pendingSegmentIds : allocatorToActiveIds.values()) { + for (SegmentId pendingSegmentId : pendingSegmentIds) { + maxId = Math.max(maxId, pendingSegmentId.getPartitionNum()); + } + } + return maxId; + } + + String getVersion(String preferredVersion) + { + if (committedVersion != null) { + return committedVersion; + } else if (allocatedVersion != null) { + return allocatedVersion; + } else { + return preferredVersion; + } + } + + void updateMaxSegment(DataSegment segment) + { + if (!interval.equals(segment.getInterval())) { + throw DruidException.defensive( + "Interval[%s] mismatch for segment[%s] with interval[%s].", + interval, segment.getId(), segment.getInterval() + ); + } + if (numCorePartitions != segment.getShardSpec().getNumCorePartitions()) { + throw DruidException.defensive( + "numCorePartitions[%s] mismatch for segment[%s] with numCorePartitions[%d].", + numCorePartitions, segment.getId(), segment.getShardSpec().getNumCorePartitions() + ); + } + + if (committedVersion == null) { + committedVersion = segment.getVersion(); + } else if (!committedVersion.equals(segment.getVersion())) { + throw DruidException.defensive( + "segment[%s] does not have the expected version[%s].", + segment.getId(), committedVersion + ); + } + + maxCommittedId = Math.max(maxCommittedId, segment.getShardSpec().getPartitionNum()); + } + + void addPendingSegment(PendingSegmentRecord pendingSegment) + { + SegmentId id = pendingSegment.getId().asSegmentId(); + if (committedVersion != null && !committedVersion.equals(id.getVersion())) { + throw DruidException.defensive( + "pendingSegment[%s] does not have the expected version[%s].", + id, committedVersion + ); + } + + if (allocatedVersion == null) { + allocatedVersion = id.getVersion(); + } else if (!allocatedVersion.equals(id.getVersion())) { + throw DruidException.defensive( + "pendingSegment[%s] does not have the expected version[%s].", + id, allocatedVersion + ); + } + + String allocator = pendingSegment.getTaskAllocatorId(); + if (allocator == null) { + maxNonReusableId = Math.max(maxNonReusableId, id.getPartitionNum()); + } else { + allocatorToActiveIds.computeIfAbsent(allocator, k -> new HashSet<>()) + .add(id); + } + } + + void removePendingSegmentsForAllocator(String allocatorId) + { + allocatorToActiveIds.remove(allocatorId); + // If all pending segments have been cleaned up, we can have a different allocated version in the future + if (allocatorToActiveIds.isEmpty() && maxNonReusableId == 0) { + allocatedVersion = null; + } + } + } + /** * Task locks for tasks of the same groupId */ @@ -1587,6 +1948,7 @@ static class TaskLockPosse { private final TaskLock taskLock; private final Set taskIds; + private VisibleSegmentState visibleSegmentState; TaskLockPosse(TaskLock taskLock) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index 3c0b08758f76..126e4b06c218 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Futures; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentLock; import org.apache.druid.indexing.common.TaskLock; @@ -39,8 +40,9 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec; @@ -51,8 +53,9 @@ import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartialShardSpec; import org.apache.druid.timeline.partition.ShardSpec; -import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Interval; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -81,6 +84,8 @@ @RunWith(Parameterized.class) public class SegmentAllocateActionTest { + private static final Logger log = new Logger(SegmentAllocateActionTest.class); + @Rule public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); @@ -92,6 +97,7 @@ public class SegmentAllocateActionTest private final LockGranularity lockGranularity; private SegmentAllocationQueue allocationQueue; + private StubServiceEmitter emitter; @Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}, skipSegmentPayloadFetchForAllocation = {2}") public static Iterable constructorFeeder() @@ -101,6 +107,7 @@ public static Iterable constructorFeeder() new Object[]{LockGranularity.SEGMENT, true, false}, new Object[]{LockGranularity.SEGMENT, false, false}, new Object[]{LockGranularity.TIME_CHUNK, true, true}, + new Object[]{LockGranularity.TIME_CHUNK, false, true}, new Object[]{LockGranularity.TIME_CHUNK, true, false}, new Object[]{LockGranularity.TIME_CHUNK, false, false} ); @@ -120,9 +127,6 @@ public SegmentAllocateActionTest( @Before public void setUp() { - ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); - EmittingLogger.registerEmitter(emitter); - EasyMock.replay(emitter); allocationQueue = taskActionTestKit.getTaskActionToolbox().getSegmentAllocationQueue(); if (allocationQueue != null) { allocationQueue.start(); @@ -195,6 +199,103 @@ public void testManySegmentsSameInterval_noLineageCheck() throws Exception Assert.assertEquals(expectedIds, allocatedIds); } + //@Ignore + @Test + public void testBenchmark() throws Exception + { + if (lockGranularity == LockGranularity.SEGMENT) { + return; + } + + final Task task = NoopTask.create(); + taskActionTestKit.getTaskLockbox().add(task); + + final Interval startInterval = Intervals.of("2024-01-01/PT1H"); + final long hourToMillis = 1000L * 60L * 60L; + final long numHours = 24; + List intervals = new ArrayList<>(); + for (long hour = 0; hour < numHours; hour++) { + intervals.add( + new Interval( + startInterval.getStartMillis() + hourToMillis * hour, + startInterval.getStartMillis() + hourToMillis * hour + hourToMillis, + DateTimeZone.UTC + ) + ); + } + + final IndexerSQLMetadataStorageCoordinator coordinator = + (IndexerSQLMetadataStorageCoordinator) taskActionTestKit.getMetadataStorageCoordinator(); + + final List dimensions = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + dimensions.add("dimension_" + i); + } + final Set segments = new HashSet<>(); + final int numUsedSegmentsPerInterval = 2000; + int version = 0; + for (Interval interval : intervals) { + for (int i = 0; i < numUsedSegmentsPerInterval; i++) { + segments.add( + DataSegment.builder() + .dataSource(task.getDataSource()) + .interval(interval) + .version("version" + version) + .shardSpec(new NumberedShardSpec(i, numUsedSegmentsPerInterval)) + .dimensions(dimensions) + .size(100) + .build() + ); + } + coordinator.commitSegments(segments, null); + segments.clear(); + version = (version + 1) % 10; + } + + final long start = System.nanoTime(); + + final int numAllocations = 128; + final long replicas = 2; + Map>> sequenceNameAndPrevIdToFutures = new HashMap<>(); + for (int i = 0; i < numAllocations; i++) { + for (int j = 0; j < numHours; j++) { + final long id = numUsedSegmentsPerInterval + i / replicas; + final String sequenceId = j + "-sequence" + id; + final String prevSequenceId = j + "-sequence" + (id - 1); + sequenceNameAndPrevIdToFutures.computeIfAbsent( + sequenceId + "|" + prevSequenceId, + k -> new ArrayList<>() + ).add( + allocate( + task, + intervals.get(j).getStart(), + Granularities.HOUR, + sequenceId, + prevSequenceId + ) + ); + } + } + + final Set allocatedIds = new HashSet<>(); + for (List> sameIds : sequenceNameAndPrevIdToFutures.values()) { + if (sameIds.isEmpty()) { + return; + } + final SegmentIdWithShardSpec id = sameIds.get(0).get(); + Assert.assertNotNull(id); + for (int i = 1; i < sameIds.size(); i++) { + Assert.assertEquals(id, sameIds.get(i).get()); + } + Assert.assertFalse(allocatedIds.contains(id)); + allocatedIds.add(id); + } + Assert.assertEquals(numHours * numAllocations, allocatedIds.size() * replicas); + + final long totalTime = (System.nanoTime() - start) / 1_000_000; + log.info("Total time taken for [%d] allocations is [%d] ms.", (numHours * numAllocations), totalTime); + } + @Test public void testManySegmentsSameInterval() { @@ -479,7 +580,8 @@ public void testResumeSequence() public void testSegmentIsAllocatedForLatestUsedSegmentVersion() { final Task task = NoopTask.create(); - taskActionTestKit.getTaskLockbox().add(task); + final TaskLockbox lockbox = taskActionTestKit.getTaskLockbox(); + lockbox.add(task); final String sequenceName = "sequence_1"; @@ -491,6 +593,7 @@ public void testSegmentIsAllocatedForLatestUsedSegmentVersion() assertSameIdentifier(pendingSegmentV01, pendingSegmentV02); + lockbox.unlockAll(task); // Commit a segment for version V1 final DataSegment segmentV1 = DataSegment.builder() @@ -515,6 +618,7 @@ public void testSegmentIsAllocatedForLatestUsedSegmentVersion() Assert.assertNotEquals(pendingSegmentV01, pendingSegmentV11); + lockbox.unlockAll(task); // Commit a segment for version V2 to overshadow V1 final DataSegment segmentV2 = DataSegment.builder() @@ -1266,6 +1370,34 @@ private SegmentIdWithShardSpec allocate( } } + private Future allocate( + final Task task, + final DateTime timestamp, + final Granularity preferredSegmentGranularity, + final String sequenceName, + final String sequencePreviousId + ) + { + final SegmentAllocateAction action = new SegmentAllocateAction( + DATA_SOURCE, + timestamp, + Granularities.NONE, + preferredSegmentGranularity, + sequenceName, + sequencePreviousId, + false, + NumberedPartialShardSpec.instance(), + lockGranularity, + null + ); + + if (useBatch) { + return action.performAsync(task, taskActionTestKit.getTaskActionToolbox()); + } else { + return Futures.immediateFuture(action.perform(task, taskActionTestKit.getTaskActionToolbox())); + } + } + private void assertSameIdentifier(final SegmentIdWithShardSpec expected, final SegmentIdWithShardSpec actual) { Assert.assertEquals(expected, actual); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java index 6d39023628c2..1ec2adc5808b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java @@ -25,34 +25,20 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.metrics.StubServiceEmitter; -import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.segment.TestDataSource; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.NumberedPartialShardSpec; -import org.apache.druid.timeline.partition.NumberedShardSpec; -import org.joda.time.DateTimeZone; -import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -60,7 +46,6 @@ public class SegmentAllocationQueueTest { - private static final Logger log = new Logger(SegmentAllocationQueueTest.class); @Rule public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); @@ -121,133 +106,6 @@ public void tearDown() emitter.flush(); } - @Test - @Ignore - public void testLongQueue() throws Exception - { - allocationQueue.start(); - allocationQueue.becomeLeader(); - final Task task = NoopTask.create(); - taskActionTestKit.getTaskLockbox().add(task); - - final Interval startInterval = Intervals.of("2024-01-01/PT1H"); - final long hourToMillis = 1000L * 60L * 60L; - final long numHours = 48; - List intervals = new ArrayList<>(); - for (long hour = 0; hour < numHours; hour++) { - intervals.add( - new Interval( - startInterval.getStartMillis() + hourToMillis * hour, - startInterval.getStartMillis() + hourToMillis * hour + hourToMillis, - DateTimeZone.UTC - ) - ); - } - - final IndexerSQLMetadataStorageCoordinator coordinator = - (IndexerSQLMetadataStorageCoordinator) taskActionTestKit.getMetadataStorageCoordinator(); - - final List dimensions = new ArrayList<>(); - for (int i = 0; i < 1000; i++) { - dimensions.add("dimension_" + i); - } - final Set segments = new HashSet<>(); - final int numUsedSegmentsPerInterval = 2000; - int version = 0; - for (Interval interval : intervals) { - for (int i = 0; i < numUsedSegmentsPerInterval; i++) { - segments.add( - DataSegment.builder() - .dataSource(task.getDataSource()) - .interval(interval) - .version("version" + version) - .shardSpec(new NumberedShardSpec(i, numUsedSegmentsPerInterval)) - .dimensions(dimensions) - .size(100) - .build() - ); - } - coordinator.commitSegments(segments, null); - segments.clear(); - version = (version + 1) % 10; - } - - - final int numAllocations = 10; - final int replicas = 2; - Map>> sequenceNameAndPrevIdToFutures = new HashMap<>(); - for (int i = 0; i < numAllocations; i++) { - for (int j = 0; j < numHours; j++) { - final int id = numUsedSegmentsPerInterval + i / replicas; - final String sequenceId = j + "-sequence" + id; - final String prevSequenceId = j + "-sequence" + (id - 1); - sequenceNameAndPrevIdToFutures.computeIfAbsent( - sequenceId + "|" + prevSequenceId, - k -> new ArrayList<>() - ).add( - allocationQueue.add( - new SegmentAllocateRequest( - task, - new SegmentAllocateAction( - task.getDataSource(), - intervals.get(j).getStart(), - Granularities.NONE, - Granularities.HOUR, - sequenceId, - prevSequenceId, - false, - NumberedPartialShardSpec.instance(), - LockGranularity.TIME_CHUNK, - TaskLockType.APPEND - ), - 10 - ) - ) - ); - } - } - executor.finishAllPendingTasks(); - - final Set allocatedIds = new HashSet<>(); - for (List> sameIds : sequenceNameAndPrevIdToFutures.values()) { - if (sameIds.isEmpty()) { - return; - } - final SegmentIdWithShardSpec id = sameIds.get(0).get(); - Assert.assertNotNull(id); - for (int i = 1; i < sameIds.size(); i++) { - Assert.assertEquals(id, sameIds.get(i).get()); - } - Assert.assertFalse(allocatedIds.contains(id)); - allocatedIds.add(id); - } - Assert.assertEquals(numHours * numAllocations, allocatedIds.size() * replicas); - printStats("task/action/batch/runTime"); - printStats("task/action/batch/queueTime"); - } - - private void printStats(String metricName) - { - int count = 0; - double max = 0; - double min = Double.POSITIVE_INFINITY; - double sum = 0; - for (ServiceMetricEvent event : emitter.getMetricEvents().get(metricName)) { - count++; - max = Math.max(max, event.getValue().doubleValue()); - min = Math.min(min, event.getValue().doubleValue()); - sum += event.getValue().doubleValue(); - } - double avg = 0; - if (count > 0) { - avg = sum / count; - } - log.info( - "Metric[%s]: count[%d], min[%f], max[%f], avg[%f], sum[%f]", - metricName, count, min, max, avg, sum - ); - } - @Test public void testBatchWithMultipleTimestamps() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 69c8b6079055..6acd7e16a974 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -56,7 +56,6 @@ public class TaskActionTestKit extends ExternalResource private SegmentsMetadataManager segmentsMetadataManager; private TaskActionToolbox taskActionToolbox; private SegmentSchemaManager segmentSchemaManager; - private SegmentSchemaCache segmentSchemaCache; private boolean skipSegmentPayloadFetchForAllocation = new TaskLockConfig().isSegmentAllocationReduceMetadataIO(); @@ -109,17 +108,6 @@ public int getSqlMetadataMaxRetry() return 2; } }; - taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); - segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance()); - segmentsMetadataManager = new SqlSegmentsMetadataManager( - objectMapper, - Suppliers.ofInstance(new SegmentsMetadataManagerConfig()), - Suppliers.ofInstance(metadataStorageTablesConfig), - testDerbyConnector, - segmentSchemaCache, - CentralizedDatasourceSchemaConfig.create(), - NoopServiceEmitter.instance() - ); final TaskLockConfig taskLockConfig = new TaskLockConfig() { @Override @@ -141,6 +129,17 @@ public boolean isSegmentAllocationReduceMetadataIO() } }; + taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); + SegmentSchemaCache segmentSchemaCache = new SegmentSchemaCache(NoopServiceEmitter.instance()); + segmentsMetadataManager = new SqlSegmentsMetadataManager( + objectMapper, + Suppliers.ofInstance(new SegmentsMetadataManagerConfig()), + Suppliers.ofInstance(metadataStorageTablesConfig), + testDerbyConnector, + segmentSchemaCache, + CentralizedDatasourceSchemaConfig.create(), + NoopServiceEmitter.instance() + ); taskActionToolbox = new TaskActionToolbox( taskLockbox, taskStorage, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java index f2749d15cdb1..e8dab06030e1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -67,7 +68,8 @@ public void setup() final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); lockbox = new TaskLockbox( taskStorage, - new TestIndexerMetadataStorageCoordinator() + new TestIndexerMetadataStorageCoordinator(), + new TaskLockConfig() ); task = NoopTask.create(); taskStorage.insert(task, TaskStatus.running(task.getId())); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 7f32c67c2384..d5b2c2eefcc5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -57,6 +57,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.java.util.common.ISE; @@ -165,7 +166,7 @@ public void setUpIngestionTestBase() throws IOException CentralizedDatasourceSchemaConfig.create(), NoopServiceEmitter.instance() ); - lockbox = new TaskLockbox(taskStorage, storageCoordinator); + lockbox = new TaskLockbox(taskStorage, storageCoordinator, new TaskLockConfig()); segmentCacheManagerFactory = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, getObjectMapper()); reportsFile = temporaryFolder.newFile(); dataSegmentKiller = new TestDataSegmentKiller(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index f48c1d87a2b2..52365317e214 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -34,6 +34,7 @@ import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskQueue; import org.apache.druid.indexing.overlord.TaskRunner; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; @@ -129,7 +130,8 @@ public void setUp() private void initScheduler() { - TaskLockbox taskLockbox = new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()); + TaskLockbox taskLockbox = + new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator(), new TaskLockConfig()); WorkerBehaviorConfig defaultWorkerConfig = new DefaultWorkerBehaviorConfig(WorkerBehaviorConfig.DEFAULT_STRATEGY, null); scheduler = new OverlordCompactionScheduler( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index abab38bf0c9b..0112320101ae 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -242,7 +242,6 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private MonitorScheduler monitorScheduler; private ServiceEmitter emitter; - private TaskLockConfig lockConfig; private TaskQueueConfig tqc; private TaskConfig taskConfig; private DataSegmentPusher dataSegmentPusher; @@ -547,7 +546,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( Preconditions.checkNotNull(taskStorage); Preconditions.checkNotNull(emitter); - taskLockbox = new TaskLockbox(taskStorage, mdc); + taskLockbox = new TaskLockbox(taskStorage, mdc, new TaskLockConfig()); tac = new LocalTaskActionClientFactory( new TaskActionToolbox( taskLockbox, @@ -636,7 +635,7 @@ private TaskQueue setUpTaskQueue(TaskStorage ts, TaskRunner tr) throws Exception Preconditions.checkNotNull(tac); Preconditions.checkNotNull(emitter); - lockConfig = new TaskLockConfig(); + TaskLockConfig lockConfig = new TaskLockConfig(); tqc = mapper.readValue( "{\"startDelay\":\"PT0S\", \"restartDelay\":\"PT1S\", \"storageSyncRate\":\"PT0.5S\"}", TaskQueueConfig.class diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java index 4dc0416cd1f2..4bd9ecf5ca0e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.concurrent.Execs; @@ -85,7 +86,8 @@ public void setup() derbyConnector, segmentSchemaManager, CentralizedDatasourceSchemaConfig.create() - ) + ), + new TaskLockConfig() ); service = Execs.multiThreaded(2, "TaskLockBoxConcurrencyTest-%d"); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java index 69c2e1fe8cab..3068c3be1f35 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java @@ -112,7 +112,7 @@ public boolean isForceTimeChunkLock() final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null, null, null, null); final TaskRunner taskRunner = EasyMock.createNiceMock(RemoteTaskRunner.class); final TaskActionClientFactory actionClientFactory = EasyMock.createNiceMock(LocalTaskActionClientFactory.class); - final TaskLockbox lockbox = new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()); + final TaskLockbox lockbox = new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator(), lockConfig); final ServiceEmitter emitter = new NoopServiceEmitter(); return new TaskQueue( lockConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index a8c4b5117b1b..298d8a69c3c3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -44,6 +44,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -90,6 +91,7 @@ public class TaskLockboxTest @Rule public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule(); + private final TaskLockConfig taskLockConfig = new TaskLockConfig(); private ObjectMapper objectMapper; private TaskStorage taskStorage; private IndexerMetadataStorageCoordinator metadataStorageCoordinator; @@ -136,7 +138,7 @@ public void setup() CentralizedDatasourceSchemaConfig.create() ); - lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); validator = new TaskLockboxValidator(lockbox, taskStorage); } @@ -332,7 +334,7 @@ public void testTimeoutForLock() throws InterruptedException @Test public void testSyncFromStorage() { - final TaskLockbox originalBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final TaskLockbox originalBox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); for (int i = 0; i < 5; i++) { final Task task = NoopTask.create(); taskStorage.insert(task, TaskStatus.running(task.getId())); @@ -354,7 +356,7 @@ public void testSyncFromStorage() .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) .collect(Collectors.toList()); - final TaskLockbox newBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final TaskLockbox newBox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); newBox.syncFromStorage(); Assert.assertEquals(originalBox.getAllLocks(), newBox.getAllLocks()); @@ -381,7 +383,7 @@ public void testSyncFromStorageWithMissingTaskLockPriority() .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) .collect(Collectors.toList()); - final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); lockbox.syncFromStorage(); final List afterLocksInStorage = taskStorage.getActiveTasks().stream() @@ -412,7 +414,7 @@ public void testSyncFromStorageWithMissingTaskPriority() .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) .collect(Collectors.toList()); - final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); lockbox.syncFromStorage(); final List afterLocksInStorage = taskStorage.getActiveTasks().stream() @@ -439,7 +441,7 @@ public void testSyncFromStorageWithInvalidPriority() ) ); - final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); TaskLockboxSyncResult result = lockbox.syncFromStorage(); Assert.assertEquals(1, result.getTasksToFail().size()); Assert.assertTrue(result.getTasksToFail().contains(task)); @@ -470,8 +472,8 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() CentralizedDatasourceSchemaConfig.create() ); - TaskLockbox theBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); - TaskLockbox loadedBox = new TaskLockbox(loadedTaskStorage, loadedMetadataStorageCoordinator); + TaskLockbox theBox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); + TaskLockbox loadedBox = new TaskLockbox(loadedTaskStorage, loadedMetadataStorageCoordinator, taskLockConfig); Task aTask = NoopTask.create(); taskStorage.insert(aTask, TaskStatus.running(aTask.getId())); @@ -496,7 +498,7 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() @Test public void testRevokedLockSyncFromStorage() { - final TaskLockbox originalBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final TaskLockbox originalBox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); final Task task1 = NoopTask.ofPriority(10); taskStorage.insert(task1, TaskStatus.running(task1.getId())); @@ -522,7 +524,7 @@ public void testRevokedLockSyncFromStorage() Assert.assertEquals(1, task2Locks.size()); Assert.assertTrue(task2Locks.get(0).isRevoked()); - final TaskLockbox newBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final TaskLockbox newBox = new TaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); newBox.syncFromStorage(); final Set afterLocksInStorage = taskStorage.getActiveTasks().stream() @@ -583,7 +585,7 @@ public void testDoInCriticalSectionWithSmallerInterval() throws Exception Assert.assertTrue( lockbox.doInCriticalSection( task, - Collections.singleton(interval), + Collections.singleton(smallInterval), CriticalAction.builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() ) ); @@ -1857,7 +1859,7 @@ public void testFailedToReacquireTaskLock() taskStorage.insert(taskWithFailingLockAcquisition1, TaskStatus.running(taskWithFailingLockAcquisition1.getId())); taskStorage.insert(taskWithSuccessfulLockAcquisition, TaskStatus.running(taskWithSuccessfulLockAcquisition.getId())); - TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator); + TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator, taskLockConfig); testLockbox.add(taskWithFailingLockAcquisition0); testLockbox.add(taskWithFailingLockAcquisition1); testLockbox.add(taskWithSuccessfulLockAcquisition); @@ -2012,7 +2014,7 @@ public void testCleanupOnUnlock() .andReturn(0).once(); EasyMock.replay(coordinator); - final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator); + final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator, taskLockConfig); taskLockbox.add(replaceTask); taskLockbox.tryLock( @@ -2263,10 +2265,11 @@ private static class NullLockPosseTaskLockbox extends TaskLockbox { public NullLockPosseTaskLockbox( TaskStorage taskStorage, - IndexerMetadataStorageCoordinator metadataStorageCoordinator + IndexerMetadataStorageCoordinator metadataStorageCoordinator, + TaskLockConfig taskLockConfig ) { - super(taskStorage, metadataStorageCoordinator); + super(taskStorage, metadataStorageCoordinator, taskLockConfig); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index f67e9fc28614..e1f13e59726f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -120,14 +120,15 @@ public RetType submit(TaskAction taskAction) } }; + final TaskLockConfig taskLockConfig = new TaskLockConfig(); taskQueue = new TaskQueue( - new TaskLockConfig(), + taskLockConfig, new TaskQueueConfig(null, Period.millis(1), null, null, null, null), new DefaultTaskConfig(), taskStorage, taskRunner, unsupportedTaskActionFactory, // Not used for anything serious - new TaskLockbox(taskStorage, storageCoordinator), + new TaskLockbox(taskStorage, storageCoordinator, taskLockConfig), new NoopServiceEmitter(), jsonMapper, new NoopTaskContextEnricher() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 8f1393f2c675..1c46f35b689f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -543,7 +543,7 @@ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException taskStorage, EasyMock.createMock(HttpRemoteTaskRunner.class), createActionClientFactory(), - new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()), + new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator(), new TaskLockConfig()), new StubServiceEmitter("druid/overlord", "testHost"), mapper, new NoopTaskContextEnricher() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index f1a8b65a509f..f686f393f996 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -179,7 +179,7 @@ public void setUp() throws Exception IndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); - taskLockbox = new TaskLockbox(taskStorage, mdc); + taskLockbox = new TaskLockbox(taskStorage, mdc, new TaskLockConfig()); task0 = NoopTask.create(); taskId0 = task0.getId(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 7a4fd7dadb3a..8fe1f5b22d45 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -74,6 +74,7 @@ import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.test.TestDataSegmentAnnouncer; import org.apache.druid.indexing.test.TestDataSegmentKiller; @@ -598,7 +599,7 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b segmentSchemaManager, CentralizedDatasourceSchemaConfig.create() ); - taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator, new TaskLockConfig()); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, taskStorage, diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 0652d2905490..2f338e96ce88 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -925,6 +925,87 @@ private boolean shouldUpgradePendingSegment( } } + public SegmentIdWithShardSpec findOrInsertPendingSegmentRecord( + final String datasource, + final Interval interval, + final String committedVersion, + final String version, + final int maxId, + final int numCorePartitions, + final PartialShardSpec partialShardSpec, + final String sequenceName, + final String previousSegmentId, + final boolean skipLineageCheck, + final String taskAllocatorId + ) + { + return connector.retryWithHandle( + handle -> { + final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; + final Query> query; + if (skipLineageCheck) { + final String sql = StringUtils.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "start = :start AND " + + "%2$send%2$s = :end", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + ); + query = handle.createQuery(sql) + .bind("dataSource", datasource) + .bind("sequence_name", sequenceName) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + + } else { + final String sql = StringUtils.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable() + ); + + query = handle.createQuery(sql) + .bind("dataSource", datasource) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentIdNotNull); + + } + final CheckExistingSegmentIdResult result = findExistingPendingSegment( + query, + interval, + sequenceName, + previousSegmentIdNotNull, + committedVersion + ); + + if (result.found) { + // The found existing segment identifier can be null if its interval doesn't match with the given interval + return result.segmentIdentifier; + } + + PendingSegmentRecord newId = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + datasource, + interval, + version, + partialShardSpec.complete(jsonMapper, maxId + 1, numCorePartitions) + ), + sequenceName, + previousSegmentIdNotNull, + null, + taskAllocatorId + ); + + insertPendingSegmentsIntoMetastore(handle, Collections.singletonList(newId), datasource, skipLineageCheck); + return newId.getId(); + } + ); + } + @Nullable private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( final Handle handle, @@ -2967,7 +3048,7 @@ Set retrieveUsedSegmentsForAllocation( } if (!retrievedIds.equals(segmentIdsToRetrieve)) { throw DruidException.defensive( - "Cannot create DataSegments for segment allocations." + "Cannot create DataSegments for segment allocations. " + "The used segments may have changed for dataSource[%s] and interval[%s].", dataSource, interval ); From 47199d6757ff81d411ab29b6937d35b4a9de5d3a Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 30 Oct 2024 08:06:17 +0530 Subject: [PATCH 3/3] Turn flag on for tests --- .../indexing/common/actions/SegmentAllocationQueue.java | 2 +- .../java/org/apache/druid/indexing/overlord/TaskLockbox.java | 2 +- .../druid/indexing/overlord/config/TaskLockConfig.java | 2 +- .../indexing/common/actions/SegmentAllocateActionTest.java | 5 ++--- 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 4198a5c3bcdc..c4e9b0638f3c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -70,7 +70,7 @@ public class SegmentAllocationQueue private static final Logger log = new Logger(SegmentAllocationQueue.class); private static final int MAX_QUEUE_SIZE = 2000; - private static final int MAX_BATCH_SIZE = 5; + private static final int MAX_BATCH_SIZE = 500; private final long maxWaitTimeMillis; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 9c149974a5c2..1cf014a80024 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -576,8 +576,8 @@ public LockResult allocateSegmentWithReducedMetadataIO( visibleSegmentState.addPendingSegment( new PendingSegmentRecord(allocatedId, sequenceName, previousSegmentId, null, allocatorId) ); + log.info("Found or Allocated pendingSegment[%s].", allocatedId.asSegmentId()); } - log.info("Found or Allocated pendingSegment[%s].", allocatedId.asSegmentId()); return LockResult.ok(lockPosse.getTaskLock(), allocatedId); } return LockResult.fail(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java index cf353276dc61..911374711ead 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java @@ -37,7 +37,7 @@ public class TaskLockConfig private long batchAllocationWaitTime = 0L; @JsonProperty - private boolean segmentAllocationReduceMetadataIO = false; + private boolean segmentAllocationReduceMetadataIO = true; public boolean isForceTimeChunkLock() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index 126e4b06c218..a9404e75e77c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -41,7 +41,6 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.metrics.StubServiceEmitter; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; @@ -60,6 +59,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -97,7 +97,6 @@ public class SegmentAllocateActionTest private final LockGranularity lockGranularity; private SegmentAllocationQueue allocationQueue; - private StubServiceEmitter emitter; @Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}, skipSegmentPayloadFetchForAllocation = {2}") public static Iterable constructorFeeder() @@ -199,7 +198,7 @@ public void testManySegmentsSameInterval_noLineageCheck() throws Exception Assert.assertEquals(expectedIds, allocatedIds); } - //@Ignore + @Ignore @Test public void testBenchmark() throws Exception {