diff --git a/core/src/main/java/org/apache/druid/java/util/common/Intervals.java b/core/src/main/java/org/apache/druid/java/util/common/Intervals.java index b7a1f37cf1c3..96f858fd4be2 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/Intervals.java +++ b/core/src/main/java/org/apache/druid/java/util/common/Intervals.java @@ -20,10 +20,13 @@ package org.apache.druid.java.util.common; import com.google.common.collect.ImmutableList; +import org.apache.druid.java.util.common.guava.Comparators; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; +import javax.annotation.Nullable; + public final class Intervals { public static final Interval ETERNITY = utc(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT); @@ -68,6 +71,32 @@ public static boolean isEternity(final Interval interval) return ETERNITY.equals(interval); } + /** + * Finds an interval from the given set of sortedIntervals which overlaps with + * the searchInterval. If multiple candidate intervals overlap with the + * searchInterval, the "smallest" interval based on the + * {@link Comparators#intervalsByStartThenEnd()} is returned. + * + * @param searchInterval Interval which should overlap with the result + * @param sortedIntervals Candidate overlapping intervals, sorted in ascending + * order, using {@link Comparators#intervalsByStartThenEnd()}. + * @return The first overlapping interval, if one exists, otherwise null. + */ + @Nullable + public static Interval findOverlappingInterval(Interval searchInterval, Interval[] sortedIntervals) + { + for (Interval interval : sortedIntervals) { + if (interval.overlaps(searchInterval)) { + return interval; + } else if (interval.getStart().isAfter(searchInterval.getEnd())) { + // Intervals after this cannot have an overlap + return null; + } + } + + return null; + } + private Intervals() { } diff --git a/core/src/test/java/org/apache/druid/java/util/common/IntervalsTest.java b/core/src/test/java/org/apache/druid/java/util/common/IntervalsTest.java new file mode 100644 index 000000000000..59eac8d5a991 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/common/IntervalsTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import org.apache.druid.java.util.common.guava.Comparators; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class IntervalsTest +{ + + @Test + public void testFindOverlappingInterval() + { + final Interval[] sortedIntervals = new Interval[]{ + Intervals.of("2019/2020"), + Intervals.of("2021/2022"), + Intervals.of("2021-04-01/2021-05-01"), + Intervals.of("2022/2023") + }; + Arrays.sort(sortedIntervals, Comparators.intervalsByStartThenEnd()); + + // Search interval outside the bounds of the sorted intervals + Assert.assertNull( + Intervals.findOverlappingInterval(Intervals.of("2018/2019"), sortedIntervals) + ); + Assert.assertNull( + Intervals.findOverlappingInterval(Intervals.of("2023/2024"), sortedIntervals) + ); + + // Search interval within bounds, overlap exists + // Fully overlapping interval + Assert.assertEquals( + Intervals.of("2021/2022"), + Intervals.findOverlappingInterval(Intervals.of("2021/2022"), sortedIntervals) + ); + + // Partially overlapping interval + Assert.assertEquals( + Intervals.of("2022/2023"), + Intervals.findOverlappingInterval(Intervals.of("2022-01-01/2022-01-02"), sortedIntervals) + ); + + Assert.assertEquals( + Intervals.of("2021/2022"), + Intervals.findOverlappingInterval(Intervals.of("2021-06-01/2021-07-01"), sortedIntervals) + ); + + // Overlap with multiple intervals, "smallest" one is returned + Assert.assertEquals( + Intervals.of("2021/2022"), + Intervals.findOverlappingInterval(Intervals.of("2021-03-01/2021-04-01"), sortedIntervals) + ); + + // Search interval within bounds, no overlap + Assert.assertNull( + Intervals.findOverlappingInterval(Intervals.of("2020-01-02/2020-03-03"), sortedIntervals) + ); + } + +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 6b2fad9610b3..897db9750354 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.common; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; @@ -329,14 +328,7 @@ public void publishSegments(Iterable segments) throws IOException // Request segment pushes for each set final Multimap segmentMultimap = Multimaps.index( segments, - new Function() - { - @Override - public Interval apply(DataSegment segment) - { - return segment.getInterval(); - } - } + DataSegment::getInterval ); for (final Collection segmentCollection : segmentMultimap.asMap().values()) { getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection))); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java index 27e0bcbaa29d..a07977c6a564 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java @@ -30,6 +30,7 @@ import javax.annotation.Nullable; import java.util.Map; +import java.util.concurrent.TimeUnit; public class LocalTaskActionClient implements TaskActionClient { @@ -76,11 +77,28 @@ public RetType submit(TaskAction taskAction) } final long performStartTime = System.currentTimeMillis(); - final RetType result = taskAction.perform(task, toolbox); + final RetType result = performAction(taskAction); emitTimerMetric("task/action/run/time", taskAction, System.currentTimeMillis() - performStartTime); return result; } + private R performAction(TaskAction taskAction) + { + try { + final R result; + if (taskAction.canPerformAsync(task, toolbox)) { + result = taskAction.performAsync(task, toolbox).get(5, TimeUnit.MINUTES); + } else { + result = taskAction.perform(task, toolbox); + } + + return result; + } + catch (Throwable t) { + throw new RuntimeException(t); + } + } + private void emitTimerMetric(final String metric, final TaskAction action, final long time) { final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index f61dad98168d..f0fae4a8617d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -47,6 +47,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; @@ -180,6 +181,23 @@ public TypeReference getReturnTypeReference() }; } + @Override + public boolean canPerformAsync(Task task, TaskActionToolbox toolbox) + { + return toolbox.canBatchSegmentAllocation(); + } + + @Override + public Future performAsync(Task task, TaskActionToolbox toolbox) + { + if (!toolbox.canBatchSegmentAllocation()) { + throw new ISE("Batched segment allocation is disabled"); + } + return toolbox.getSegmentAllocationQueue().add( + new SegmentAllocateRequest(task, this, MAX_ATTEMPTS) + ); + } + @Override public SegmentIdWithShardSpec perform( final Task task, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java new file mode 100644 index 000000000000..adac7523f442 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateRequest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import org.apache.druid.indexing.common.task.Task; +import org.joda.time.Interval; +import org.joda.time.chrono.ISOChronology; + +/** + * Request received by the overlord for segment allocation. + */ +public class SegmentAllocateRequest +{ + private final Task task; + private final SegmentAllocateAction action; + private final int maxAttempts; + private final Interval rowInterval; + + private int attempts; + + public SegmentAllocateRequest(Task task, SegmentAllocateAction action, int maxAttempts) + { + this.task = task; + this.action = action; + this.maxAttempts = maxAttempts; + this.rowInterval = action.getQueryGranularity() + .bucket(action.getTimestamp()) + .withChronology(ISOChronology.getInstanceUTC()); + } + + public Task getTask() + { + return task; + } + + public SegmentAllocateAction getAction() + { + return action; + } + + public void incrementAttempts() + { + ++attempts; + } + + public boolean canRetry() + { + return attempts < maxAttempts; + } + + public int getAttempts() + { + return attempts; + } + + public Interval getRowInterval() + { + return rowInterval; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateResult.java new file mode 100644 index 000000000000..995bfb9d31d0 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateResult.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; + +public class SegmentAllocateResult +{ + private final SegmentIdWithShardSpec segmentId; + private final String errorMessage; + + public SegmentAllocateResult(SegmentIdWithShardSpec segmentId, String errorMessage) + { + this.segmentId = segmentId; + this.errorMessage = errorMessage; + } + + public SegmentIdWithShardSpec getSegmentId() + { + return segmentId; + } + + public String getErrorMessage() + { + return errorMessage; + } + + public boolean isSuccess() + { + return segmentId != null; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java new file mode 100644 index 000000000000..9ed53d99faf5 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import com.google.inject.Inject; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.java.util.common.guava.Comparators; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.query.DruidMetrics; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Queue for {@link SegmentAllocateRequest}s. + */ +@ManageLifecycle +public class SegmentAllocationQueue +{ + private static final Logger log = new Logger(SegmentAllocationQueue.class); + + private static final int MAX_QUEUE_SIZE = 2000; + + private final long maxWaitTimeMillis; + + private final TaskLockbox taskLockbox; + private final ScheduledExecutorService executor; + private final IndexerMetadataStorageCoordinator metadataStorage; + private final AtomicBoolean isLeader = new AtomicBoolean(false); + private final ServiceEmitter emitter; + + private final ConcurrentHashMap keyToBatch = new ConcurrentHashMap<>(); + private final BlockingDeque processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE); + + @Inject + public SegmentAllocationQueue( + TaskLockbox taskLockbox, + TaskLockConfig taskLockConfig, + IndexerMetadataStorageCoordinator metadataStorage, + ServiceEmitter emitter, + ScheduledExecutorFactory executorFactory + ) + { + this.emitter = emitter; + this.taskLockbox = taskLockbox; + this.metadataStorage = metadataStorage; + this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime(); + + this.executor = taskLockConfig.isBatchSegmentAllocation() + ? executorFactory.create(1, "SegmentAllocQueue-%s") : null; + } + + @LifecycleStart + public void start() + { + if (isEnabled()) { + log.info("Initializing segment allocation queue."); + scheduleQueuePoll(maxWaitTimeMillis); + } + } + + @LifecycleStop + public void stop() + { + if (isEnabled()) { + log.info("Tearing down segment allocation queue."); + executor.shutdownNow(); + } + } + + public void becomeLeader() + { + if (!isLeader.compareAndSet(false, true)) { + log.info("Already the leader. Queue processing has started."); + } else if (isEnabled()) { + log.info("Elected leader. Starting queue processing."); + } else { + log.info( + "Elected leader but batched segment allocation is disabled. " + + "Segment allocation queue will not be used." + ); + } + } + + public void stopBeingLeader() + { + if (!isLeader.compareAndSet(true, false)) { + log.info("Already surrendered leadership. Queue processing is stopped."); + } else if (isEnabled()) { + log.info("Not leader anymore. Stopping queue processing."); + } else { + log.info("Not leader anymore. Segment allocation queue is already disabled."); + } + } + + public boolean isEnabled() + { + return executor != null && !executor.isShutdown(); + } + + private void scheduleQueuePoll(long delay) + { + executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS); + } + + /** + * Gets the number of batches currently in the queue. + */ + public int size() + { + return processingQueue.size(); + } + + /** + * Queues a SegmentAllocateRequest. The returned future may complete successfully + * with a non-null value or with a non-null value. + */ + public Future add(SegmentAllocateRequest request) + { + if (!isLeader.get()) { + throw new ISE("Cannot allocate segment if not leader."); + } else if (!isEnabled()) { + throw new ISE("Batched segment allocation is disabled."); + } + + final AllocateRequestKey requestKey = new AllocateRequestKey(request, maxWaitTimeMillis); + final AtomicReference> futureReference = new AtomicReference<>(); + + // Possible race condition: + // t1 -> new batch is added to queue or batch already exists in queue + // t2 -> executor pops batch, processes all requests in it + // t1 -> new request is added to dangling batch and is never picked up + // Solution: For existing batch, call keyToBatch.remove() on the key to + // wait on keyToBatch.compute() to finish before proceeding with processBatch(). + // For new batch, keyToBatch.remove() would not wait as key is not in map yet + // but a new batch is unlikely to be due immediately, so it won't get popped right away. + keyToBatch.compute(requestKey, (key, existingBatch) -> { + if (existingBatch == null) { + AllocateRequestBatch newBatch = new AllocateRequestBatch(key); + futureReference.set(newBatch.add(request)); + return addBatchToQueue(newBatch) ? newBatch : null; + } else { + futureReference.set(existingBatch.add(request)); + return existingBatch; + } + }); + + return futureReference.get(); + } + + /** + * Tries to add the given batch to the processing queue. Fails all the pending + * requests in the batch if we are not leader or if the queue is full. + */ + private boolean addBatchToQueue(AllocateRequestBatch batch) + { + batch.key.resetQueueTime(); + if (!isLeader.get()) { + batch.failPendingRequests("Cannot allocate segment if not leader"); + return false; + } else if (processingQueue.offer(batch.key)) { + log.debug("Added a new batch [%s] to queue.", batch.key); + return true; + } else { + batch.failPendingRequests( + "Segment allocation queue is full. Check the metric `task/action/batch/runTime` " + + "to determine if metadata operations are slow." + ); + return false; + } + } + + /** + * Tries to add the given batch to the processing queue. If a batch already + * exists for this key, transfers all the requests from this batch to the + * existing one. + */ + private void requeueBatch(AllocateRequestBatch batch) + { + log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), batch.key); + keyToBatch.compute(batch.key, (key, existingBatch) -> { + if (existingBatch == null) { + return addBatchToQueue(batch) ? batch : null; + } else { + // Merge requests from this batch to existing one + existingBatch.transferRequestsFrom(batch); + return existingBatch; + } + }); + } + + private void processBatchesDue() + { + clearQueueIfNotLeader(); + + // Process all batches which are due + log.debug("Processing batches which are due. Queue size [%d].", processingQueue.size()); + int numProcessedBatches = 0; + + AllocateRequestKey nextKey = processingQueue.peekFirst(); + while (nextKey != null && nextKey.isDue()) { + processingQueue.pollFirst(); + AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey); + + boolean processed; + try { + processed = processBatch(nextBatch); + } + catch (Throwable t) { + nextBatch.failPendingRequests(t); + processed = true; + log.error(t, "Error while processing batch [%s]", nextKey); + } + + // Requeue if not fully processed yet + if (processed) { + ++numProcessedBatches; + } else { + requeueBatch(nextBatch); + } + + nextKey = processingQueue.peek(); + } + + // Schedule the next round of processing + final long nextScheduleDelay; + if (processingQueue.isEmpty()) { + nextScheduleDelay = maxWaitTimeMillis; + } else { + nextKey = processingQueue.peek(); + long timeElapsed = System.currentTimeMillis() - nextKey.getQueueTime(); + nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed); + } + scheduleQueuePoll(nextScheduleDelay); + log.info("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay); + } + + /** + * Removes items from the queue as long as we are not leader. + */ + private void clearQueueIfNotLeader() + { + int failedBatches = 0; + AllocateRequestKey nextKey = processingQueue.peekFirst(); + while (nextKey != null && !isLeader.get()) { + processingQueue.pollFirst(); + AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey); + nextBatch.failPendingRequests("Cannot allocate segment if not leader"); + ++failedBatches; + + nextKey = processingQueue.peekFirst(); + } + if (failedBatches > 0) { + log.info("Not leader. Failed [%d] batches, remaining in queue [%d].", failedBatches, processingQueue.size()); + } + } + + /** + * Processes the given batch. Returns true if the batch was completely processed + * and should not be requeued. + */ + private boolean processBatch(AllocateRequestBatch requestBatch) + { + final AllocateRequestKey requestKey = requestBatch.key; + if (requestBatch.isEmpty()) { + return true; + } else if (!isLeader.get()) { + requestBatch.failPendingRequests("Cannot allocate segment if not leader"); + return true; + } + + log.debug( + "Processing [%d] requests for batch [%s], queue time [%s].", + requestBatch.size(), + requestKey, + requestKey.getQueueTime() + ); + + final long startTimeMillis = System.currentTimeMillis(); + final int batchSize = requestBatch.size(); + emitBatchMetric("task/action/batch/size", batchSize, requestKey); + emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - requestKey.getQueueTime()), requestKey); + + final Set usedSegments = retrieveUsedSegments(requestKey); + final int successCount = allocateSegmentsForBatch(requestBatch, usedSegments); + + emitBatchMetric("task/action/batch/attempts", 1L, requestKey); + emitBatchMetric("task/action/batch/runTime", (System.currentTimeMillis() - startTimeMillis), requestKey); + log.info("Successfully processed [%d / %d] requests in batch [%s].", successCount, batchSize, requestKey); + + if (requestBatch.isEmpty()) { + log.debug("All requests in batch [%s] have been processed.", requestKey); + return true; + } + + // Requeue the batch only if used segments have changed + log.debug("There are [%d] failed requests in batch [%s].", requestBatch.size(), requestKey); + final Set updatedUsedSegments = retrieveUsedSegments(requestKey); + + if (updatedUsedSegments.equals(usedSegments)) { + requestBatch.failPendingRequests("Allocation failed probably due to conflicting segments."); + return true; + } else { + log.debug("Used segments have changed. Requeuing failed requests."); + return false; + } + } + + private Set retrieveUsedSegments(AllocateRequestKey key) + { + return new HashSet<>( + metadataStorage.retrieveUsedSegmentsForInterval( + key.dataSource, + key.preferredAllocationInterval, + Segments.ONLY_VISIBLE + ) + ); + } + + private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set usedSegments) + { + int successCount = 0; + + // Find requests whose row interval overlaps with an existing used segment + final Set allRequests = requestBatch.getRequests(); + final Set requestsWithNoOverlappingSegment = new HashSet<>(); + + if (usedSegments.isEmpty()) { + requestsWithNoOverlappingSegment.addAll(allRequests); + } else { + final Interval[] sortedUsedSegmentIntervals = getSortedIntervals(usedSegments); + final Map> overlapIntervalToRequests = new HashMap<>(); + + for (SegmentAllocateRequest request : allRequests) { + // If there is an overlapping used segment, the interval of the used segment + // is the only candidate for allocation for this request + final Interval overlappingInterval = Intervals.findOverlappingInterval( + request.getRowInterval(), + sortedUsedSegmentIntervals + ); + + if (overlappingInterval == null) { + requestsWithNoOverlappingSegment.add(request); + } else if (overlappingInterval.contains(request.getRowInterval())) { + // Found an enclosing interval, use this for allocation + overlapIntervalToRequests.computeIfAbsent(overlappingInterval, i -> new ArrayList<>()) + .add(request); + } else { + // There is no valid allocation interval for this request due to a + // partially overlapping used segment. Need not do anything right now. + // The request will be retried upon requeueing the batch. + } + } + + // Try to allocate segments for the identified used segment intervals. + // Do not retry the failed requests with other intervals unless the batch is requeued. + for (Map.Entry> entry : overlapIntervalToRequests.entrySet()) { + successCount += allocateSegmentsForInterval( + entry.getKey(), + entry.getValue(), + requestBatch + ); + } + } + + // For requests that do not overlap with a used segment, first try to allocate + // using the preferred granularity, then smaller granularities + final Set pendingRequests = new HashSet<>(requestsWithNoOverlappingSegment); + for (Granularity granularity : + Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity)) { + Map> requestsByInterval = + getRequestsByInterval(pendingRequests, granularity); + + for (Map.Entry> entry : requestsByInterval.entrySet()) { + successCount += allocateSegmentsForInterval( + entry.getKey(), + entry.getValue(), + requestBatch + ); + pendingRequests.retainAll(requestBatch.getRequests()); + } + } + + return successCount; + } + + private Interval[] getSortedIntervals(Set usedSegments) + { + TreeSet sortedSet = new TreeSet<>(Comparators.intervalsByStartThenEnd()); + usedSegments.forEach(segment -> sortedSet.add(segment.getInterval())); + return sortedSet.toArray(new Interval[0]); + } + + /** + * Tries to allocate segments for the given requests over the specified interval. + * Returns the number of requests for which segments were successfully allocated. + */ + private int allocateSegmentsForInterval( + Interval tryInterval, + List requests, + AllocateRequestBatch requestBatch + ) + { + if (requests.isEmpty()) { + return 0; + } + + final AllocateRequestKey requestKey = requestBatch.key; + log.debug( + "Trying allocation for [%d] requests, interval [%s] in batch [%s]", + requests.size(), + tryInterval, + requestKey + ); + + final List results = taskLockbox.allocateSegments( + requests, + requestKey.dataSource, + tryInterval, + requestKey.skipSegmentLineageCheck, + requestKey.lockGranularity + ); + + int successfulRequests = 0; + for (int i = 0; i < requests.size(); ++i) { + SegmentAllocateRequest request = requests.get(i); + SegmentAllocateResult result = results.get(i); + if (result.isSuccess()) { + ++successfulRequests; + } + + requestBatch.handleResult(result, request); + } + + return successfulRequests; + } + + private Map> getRequestsByInterval( + Set requests, + Granularity tryGranularity + ) + { + final Map> tryIntervalToRequests = new HashMap<>(); + for (SegmentAllocateRequest request : requests) { + Interval tryInterval = tryGranularity.bucket(request.getAction().getTimestamp()); + if (tryInterval.contains(request.getRowInterval())) { + tryIntervalToRequests.computeIfAbsent(tryInterval, i -> new ArrayList<>()).add(request); + } + } + return tryIntervalToRequests; + } + + private void emitTaskMetric(String metric, long value, SegmentAllocateRequest request) + { + final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); + IndexTaskUtils.setTaskDimensions(metricBuilder, request.getTask()); + metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE); + emitter.emit(metricBuilder.build(metric, value)); + } + + private void emitBatchMetric(String metric, long value, AllocateRequestKey key) + { + final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder(); + metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE); + metricBuilder.setDimension(DruidMetrics.DATASOURCE, key.dataSource); + metricBuilder.setDimension(DruidMetrics.INTERVAL, key.preferredAllocationInterval.toString()); + emitter.emit(metricBuilder.build(metric, value)); + } + + /** + * A batch of segment allocation requests. + */ + private class AllocateRequestBatch + { + private final AllocateRequestKey key; + + /** + * Map from allocate requests (represents a single SegmentAllocateAction) + * to the future of allocated segment id. + *

+ * This must be accessed through methods synchronized on this batch. + * It is to avoid races between a new request being added just when the batch + * is being processed. + */ + private final Map> + requestToFuture = new HashMap<>(); + + AllocateRequestBatch(AllocateRequestKey key) + { + this.key = key; + } + + synchronized Future add(SegmentAllocateRequest request) + { + log.debug("Adding request to batch [%s]: %s", key, request.getAction()); + return requestToFuture.computeIfAbsent(request, req -> new CompletableFuture<>()); + } + + synchronized void transferRequestsFrom(AllocateRequestBatch batch) + { + requestToFuture.putAll(batch.requestToFuture); + batch.requestToFuture.clear(); + } + + synchronized Set getRequests() + { + return new HashSet<>(requestToFuture.keySet()); + } + + synchronized void failPendingRequests(String reason) + { + failPendingRequests(new ISE(reason)); + } + + synchronized void failPendingRequests(Throwable cause) + { + if (!requestToFuture.isEmpty()) { + log.warn("Failing [%d] requests in batch due to [%s]. Batch key: %s", size(), cause.getMessage(), key); + requestToFuture.values().forEach(future -> future.completeExceptionally(cause)); + requestToFuture.keySet().forEach( + request -> emitTaskMetric("task/action/failed/count", 1L, request) + ); + requestToFuture.clear(); + } + } + + synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request) + { + request.incrementAttempts(); + + if (result.isSuccess()) { + emitTaskMetric("task/action/success/count", 1L, request); + requestToFuture.remove(request).complete(result.getSegmentId()); + } else if (request.canRetry()) { + log.info( + "Allocation failed in attempt [%d] due to error [%s]. Can still retry. Action: %s", + request.getAttempts(), + result.getErrorMessage(), + request.getAction() + ); + } else { + emitTaskMetric("task/action/failed/count", 1L, request); + log.error( + "Failing allocate action after [%d] attempts. Latest error [%s]. Action: %s", + request.getAttempts(), + result.getErrorMessage(), + request.getAction() + ); + requestToFuture.remove(request).completeExceptionally(new ISE(result.getErrorMessage())); + } + } + + synchronized boolean isEmpty() + { + return requestToFuture.isEmpty(); + } + + synchronized int size() + { + return requestToFuture.size(); + } + } + + /** + * Key to identify a batch of allocation requests. + */ + private static class AllocateRequestKey + { + private long queueTimeMillis; + private final long maxWaitTimeMillis; + + private final String dataSource; + private final String groupId; + private final Interval preferredAllocationInterval; + private final Granularity preferredSegmentGranularity; + + private final boolean skipSegmentLineageCheck; + private final LockGranularity lockGranularity; + + private final boolean useNonRootGenPartitionSpace; + + private final int hash; + + /** + * Creates a new key for the given request. The batch for a unique key will + * always contain a single request. + */ + AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis) + { + final SegmentAllocateAction action = request.getAction(); + final Task task = request.getTask(); + + this.dataSource = action.getDataSource(); + this.groupId = task.getGroupId(); + this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck(); + this.lockGranularity = action.getLockGranularity(); + this.useNonRootGenPartitionSpace = action.getPartialShardSpec() + .useNonRootGenerationPartitionSpace(); + this.preferredSegmentGranularity = action.getPreferredSegmentGranularity(); + this.preferredAllocationInterval = action.getPreferredSegmentGranularity() + .bucket(action.getTimestamp()); + + this.hash = Objects.hash( + skipSegmentLineageCheck, + useNonRootGenPartitionSpace, + dataSource, + groupId, + preferredAllocationInterval, + lockGranularity + ); + + this.maxWaitTimeMillis = maxWaitTimeMillis; + } + + void resetQueueTime() + { + queueTimeMillis = System.currentTimeMillis(); + } + + long getQueueTime() + { + return queueTimeMillis; + } + + boolean isDue() + { + return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AllocateRequestKey that = (AllocateRequestKey) o; + return skipSegmentLineageCheck == that.skipSegmentLineageCheck + && useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace + && dataSource.equals(that.dataSource) + && groupId.equals(that.groupId) + && preferredAllocationInterval.equals(that.preferredAllocationInterval) + && lockGranularity == that.lockGranularity; + } + + @Override + public int hashCode() + { + return hash; + } + + @Override + public String toString() + { + return "{" + + "ds='" + dataSource + '\'' + + ", gr='" + groupId + '\'' + + ", lock=" + lockGranularity + + ", invl=" + preferredAllocationInterval + + ", slc=" + skipSegmentLineageCheck + + '}'; + } + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index 559039d96ef6..18e373727790 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -24,6 +24,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import org.apache.druid.indexing.common.task.Task; +import java.util.concurrent.Future; + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = TaskAction.TYPE_FIELD) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "lockAcquire", value = TimeChunkLockAcquireAction.class), @@ -58,6 +60,16 @@ public interface TaskAction boolean isAudited(); + default boolean canPerformAsync(Task task, TaskActionToolbox toolbox) + { + return false; + } + + default Future performAsync(Task task, TaskActionToolbox toolbox) + { + throw new UnsupportedOperationException(); + } + @Override String toString(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java index 134a9bf6c72e..7d001ecbcb9a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java @@ -35,6 +35,7 @@ public class TaskActionToolbox { private final TaskLockbox taskLockbox; private final TaskStorage taskStorage; + private final SegmentAllocationQueue segmentAllocationQueue; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private final ServiceEmitter emitter; private final SupervisorManager supervisorManager; @@ -46,6 +47,7 @@ public TaskActionToolbox( TaskLockbox taskLockbox, TaskStorage taskStorage, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + SegmentAllocationQueue segmentAllocationQueue, ServiceEmitter emitter, SupervisorManager supervisorManager, @Json ObjectMapper jsonMapper @@ -57,6 +59,27 @@ public TaskActionToolbox( this.emitter = emitter; this.supervisorManager = supervisorManager; this.jsonMapper = jsonMapper; + this.segmentAllocationQueue = segmentAllocationQueue; + } + + public TaskActionToolbox( + TaskLockbox taskLockbox, + TaskStorage taskStorage, + IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + ServiceEmitter emitter, + SupervisorManager supervisorManager, + @Json ObjectMapper jsonMapper + ) + { + this( + taskLockbox, + taskStorage, + indexerMetadataStorageCoordinator, + null, + emitter, + supervisorManager, + jsonMapper + ); } public TaskLockbox getTaskLockbox() @@ -103,4 +126,13 @@ public Optional getTaskRunner() return Optional.absent(); } + public SegmentAllocationQueue getSegmentAllocationQueue() + { + return segmentAllocationQueue; + } + + public boolean canBatchSegmentAllocation() + { + return segmentAllocationQueue != null && segmentAllocationQueue.isEnabled(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 24eebec94471..dc62d84e4ae8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.overlord; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; @@ -34,9 +33,13 @@ import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TimeChunkLock; +import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentAllocateRequest; +import org.apache.druid.indexing.common.actions.SegmentAllocateResult; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -446,6 +449,140 @@ public LockResult tryLock(final Task task, final LockRequest request) } } + /** + * Attempts to allocate segments for the given requests. Each request contains + * a {@link Task} and a {@link SegmentAllocateAction}. This method tries to + * acquire the task locks on the required intervals/segments and then performs + * a batch allocation of segments. It is possible that some requests succeed + * successfully and others failed. In that case, only the failed ones should be + * retried. + * + * @param requests List of allocation requests + * @param dataSource Datasource for which segment is to be allocated. + * @param interval Interval for which segment is to be allocated. + * @param skipSegmentLineageCheck Whether lineage check is to be skipped + * (this is true for streaming ingestion) + * @param lockGranularity Granularity of task lock + * @return List of allocation results in the same order as the requests. + */ + public List allocateSegments( + List requests, + String dataSource, + Interval interval, + boolean skipSegmentLineageCheck, + LockGranularity lockGranularity + ) + { + log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval); + final boolean isTimeChunkLock = lockGranularity == LockGranularity.TIME_CHUNK; + + final AllocationHolderList holderList = new AllocationHolderList(requests, interval); + holderList.getPending().forEach(this::verifyTaskIsActive); + + giant.lock(); + try { + if (isTimeChunkLock) { + holderList.getPending().forEach(holder -> acquireTaskLock(holder, true)); + allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending()); + } else { + allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending()); + holderList.getPending().forEach(holder -> acquireTaskLock(holder, false)); + } + + // TODO: for failed allocations, cleanup newly created locks from the posse map + holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder, isTimeChunkLock)); + } + finally { + giant.unlock(); + } + + return holderList.getResults(); + } + + /** + * Marks the segment allocation as failed if the underlying task is not active. + */ + private void verifyTaskIsActive(SegmentAllocationHolder holder) + { + final String taskId = holder.task.getId(); + if (!activeTasks.contains(taskId)) { + holder.markFailed("Unable to grant lock to inactive Task [%s]", taskId); + } + } + + /** + * Creates a task lock request and creates or finds the lock for that request. + * Marks the segment allocation as failed if the lock could not be acquired or + * was revoked. + */ + private void acquireTaskLock(SegmentAllocationHolder holder, boolean isTimeChunkLock) + { + final LockRequest lockRequest; + if (isTimeChunkLock) { + lockRequest = new TimeChunkLockRequest(holder.lockRequest); + } else { + lockRequest = new SpecificSegmentLockRequest(holder.lockRequest, holder.allocatedSegment); + } + + // Create or find the task lock for the created lock request + final TaskLockPosse posseToUse = createOrFindLockPosse(lockRequest); + final TaskLock acquiredLock = posseToUse == null ? null : posseToUse.getTaskLock(); + if (posseToUse == null) { + holder.markFailed("Could not find or create lock posse."); + } else if (acquiredLock.isRevoked()) { + holder.markFailed("Lock was revoked."); + } else { + holder.setAcquiredLock(posseToUse, lockRequest.getInterval()); + } + } + + /** + * Adds the task to the found lock posse if not already added and updates + * in the metadata store. Marks the segment allocation as failed if the update + * did not succeed. + */ + private void addTaskAndPersistLocks(SegmentAllocationHolder holder, boolean isTimeChunkLock) + { + final Task task = holder.task; + final TaskLock acquiredLock = holder.acquiredLock; + + if (holder.taskLockPosse.addTask(task)) { + log.info("Added task [%s] to TaskLock [%s]", task.getId(), acquiredLock); + + // This can also be batched later + boolean success = updateLockInStorage(task, acquiredLock); + if (success) { + holder.markSucceeded(); + } else { + final Integer partitionId = isTimeChunkLock + ? null : ((SegmentLock) acquiredLock).getPartitionId(); + unlock(task, holder.lockRequestInterval, partitionId); + holder.markFailed("Could not update task lock in metadata store."); + } + } else { + log.info("Task [%s] already present in TaskLock [%s]", task.getId(), acquiredLock.getGroupId()); + holder.markSucceeded(); + } + } + + private boolean updateLockInStorage(Task task, TaskLock taskLock) + { + try { + taskStorage.addLock(task.getId(), taskLock); + return true; + } + catch (Exception e) { + log.makeAlert("Failed to persist lock in storage") + .addData("task", task.getId()) + .addData("dataSource", taskLock.getDataSource()) + .addData("interval", taskLock.getInterval()) + .addData("version", taskLock.getVersion()) + .emit(); + + return false; + } + } + private TaskLockPosse createOrFindLockPosse(LockRequest request) { Preconditions.checkState(!(request instanceof LockRequestForNewSegment), "Can't handle LockRequestForNewSegment"); @@ -541,7 +678,6 @@ && areAllEqualOrHigherPriorityLocksSharedOrRevoked(conflictPosses, request.getPr * monotonicity and that callers specifying {@code preferredVersion} are doing the right thing. * * @param request request to lock - * * @return a new {@link TaskLockPosse} */ private TaskLockPosse createNewTaskLockPosse(LockRequest request) @@ -550,7 +686,10 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest request) try { final TaskLockPosse posseToUse = new TaskLockPosse(request.toLock()); running.computeIfAbsent(request.getDataSource(), k -> new TreeMap<>()) - .computeIfAbsent(request.getInterval().getStart(), k -> new TreeMap<>(Comparators.intervalsByStartThenEnd())) + .computeIfAbsent( + request.getInterval().getStart(), + k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()) + ) .computeIfAbsent(request.getInterval(), k -> new ArrayList<>()) .add(posseToUse); @@ -561,6 +700,45 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest request) } } + /** + * Makes a call to the {@link #metadataStorageCoordinator} to allocate segments + * for the given requests. Updates the holder with the allocated segment if + * the allocation succeeds, otherwise marks it as failed. + */ + private void allocateSegmentIds( + String dataSource, + Interval interval, + boolean skipSegmentLineageCheck, + Collection holders + ) + { + if (holders.isEmpty()) { + return; + } + + final List createRequests = + holders.stream() + .map(SegmentAllocationHolder::getSegmentRequest) + .collect(Collectors.toList()); + + Map allocatedSegments = + metadataStorageCoordinator.allocatePendingSegments( + dataSource, + interval, + skipSegmentLineageCheck, + createRequests + ); + + for (SegmentAllocationHolder holder : holders) { + SegmentIdWithShardSpec segmentId = allocatedSegments.get(holder.getSegmentRequest()); + if (segmentId == null) { + holder.markFailed("Storage coordinator could not allocate segment."); + } else { + holder.setAllocatedSegment(segmentId); + } + } + } + private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version) { return metadataStorageCoordinator.allocatePendingSegment( @@ -577,7 +755,7 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques /** * Perform the given action with a guarantee that the locks of the task are not revoked in the middle of action. This * method first checks that all locks for the given task and intervals are valid and perform the right action. - * + *

* The given action should be finished as soon as possible because all other methods in this class are blocked until * this method is finished. * @@ -611,7 +789,7 @@ private boolean isTaskLocksValid(Task task, List intervals) .allMatch(interval -> { final List lockPosses = getOnlyTaskLockPosseContainingInterval(task, interval); return lockPosses.stream().map(TaskLockPosse::getTaskLock).noneMatch( - lock -> lock.isRevoked() + TaskLock::isRevoked ); }); } @@ -664,7 +842,9 @@ protected void revokeLock(String taskId, TaskLock lock) final TaskLock revokedLock = lock.revokedCopy(); taskStorage.replaceLock(taskId, lock, revokedLock); - final List possesHolder = running.get(task.getDataSource()).get(lock.getInterval().getStart()).get(lock.getInterval()); + final List possesHolder = running.get(task.getDataSource()) + .get(lock.getInterval().getStart()) + .get(lock.getInterval()); final TaskLockPosse foundPosse = possesHolder.stream() .filter(posse -> posse.getTaskLock().equals(lock)) .findFirst() @@ -692,16 +872,7 @@ public List findLocksForTask(final Task task) giant.lock(); try { - return Lists.transform( - findLockPossesForTask(task), new Function() - { - @Override - public TaskLock apply(TaskLockPosse taskLockPosse) - { - return taskLockPosse.getTaskLock(); - } - } - ); + return Lists.transform(findLockPossesForTask(task), TaskLockPosse::getTaskLock); } finally { giant.unlock(); @@ -778,7 +949,7 @@ public void unlock(final Task task, final Interval interval) * Release lock held for a task on a particular interval. Does nothing if the task does not currently * hold the mentioned lock. * - * @param task task to unlock + * @param task task to unlock * @param interval interval to unlock */ public void unlock(final Task task, final Interval interval, @Nullable Integer partitionId) @@ -1233,7 +1404,7 @@ public boolean equals(Object o) TaskLockPosse that = (TaskLockPosse) o; return java.util.Objects.equals(taskLock, that.taskLock) && - java.util.Objects.equals(taskIds, that.taskIds); + java.util.Objects.equals(taskIds, that.taskIds); } @Override @@ -1251,4 +1422,121 @@ public String toString() .toString(); } } + + /** + * Maintains a list of pending allocation holders. + */ + private static class AllocationHolderList + { + final List all = new ArrayList<>(); + final Set pending = new HashSet<>(); + final Set recentlyCompleted = new HashSet<>(); + + AllocationHolderList(List requests, Interval interval) + { + for (SegmentAllocateRequest request : requests) { + SegmentAllocationHolder holder = new SegmentAllocationHolder(request, interval, this); + all.add(holder); + pending.add(holder); + } + } + + void markCompleted(SegmentAllocationHolder holder) + { + recentlyCompleted.add(holder); + } + + Set getPending() + { + pending.removeAll(recentlyCompleted); + recentlyCompleted.clear(); + return pending; + } + + + List getResults() + { + return all.stream().map(holder -> holder.result).collect(Collectors.toList()); + } + } + + /** + * Contains the task, request, lock and final result for a segment allocation. + */ + private static class SegmentAllocationHolder + { + final AllocationHolderList list; + + final Task task; + final Interval allocateInterval; + final SegmentAllocateAction action; + final LockRequestForNewSegment lockRequest; + SegmentCreateRequest segmentRequest; + + TaskLock acquiredLock; + TaskLockPosse taskLockPosse; + Interval lockRequestInterval; + SegmentIdWithShardSpec allocatedSegment; + SegmentAllocateResult result; + + SegmentAllocationHolder(SegmentAllocateRequest request, Interval allocateInterval, AllocationHolderList list) + { + this.list = list; + this.allocateInterval = allocateInterval; + this.task = request.getTask(); + this.action = request.getAction(); + + this.lockRequest = new LockRequestForNewSegment( + action.getLockGranularity(), + action.getTaskLockType(), + task.getGroupId(), + action.getDataSource(), + allocateInterval, + action.getPartialShardSpec(), + task.getPriority(), + action.getSequenceName(), + action.getPreviousSegmentId(), + action.isSkipSegmentLineageCheck() + ); + } + + SegmentCreateRequest getSegmentRequest() + { + // Initialize the first time this is requested + if (segmentRequest == null) { + segmentRequest = new SegmentCreateRequest( + action.getSequenceName(), + action.getPreviousSegmentId(), + acquiredLock == null ? lockRequest.getVersion() : acquiredLock.getVersion(), + action.getPartialShardSpec() + ); + } + + return segmentRequest; + } + + void markFailed(String msgFormat, Object... args) + { + list.markCompleted(this); + result = new SegmentAllocateResult(null, StringUtils.format(msgFormat, args)); + } + + void markSucceeded() + { + list.markCompleted(this); + result = new SegmentAllocateResult(allocatedSegment, null); + } + + void setAllocatedSegment(SegmentIdWithShardSpec segmentId) + { + this.allocatedSegment = segmentId; + } + + void setAcquiredLock(TaskLockPosse lockPosse, Interval lockRequestInterval) + { + this.taskLockPosse = lockPosse; + this.acquiredLock = lockPosse == null ? null : lockPosse.getTaskLock(); + this.lockRequestInterval = lockRequestInterval; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java index 7b9101cf1f22..28c623fdadb4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java @@ -26,6 +26,7 @@ import org.apache.druid.discovery.DruidLeaderSelector; import org.apache.druid.discovery.DruidLeaderSelector.Listener; import org.apache.druid.guice.annotations.Self; +import org.apache.druid.indexing.common.actions.SegmentAllocationQueue; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.task.Task; @@ -91,7 +92,8 @@ public TaskMaster( final ServiceEmitter emitter, final SupervisorManager supervisorManager, final OverlordHelperManager overlordHelperManager, - @IndexingService final DruidLeaderSelector overlordLeaderSelector + @IndexingService final DruidLeaderSelector overlordLeaderSelector, + final SegmentAllocationQueue segmentAllocationQueue ) { this.supervisorManager = supervisorManager; @@ -136,6 +138,22 @@ public void becomeLeader() leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(supervisorManager); leaderLifecycle.addManagedInstance(overlordHelperManager); + leaderLifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() + { + segmentAllocationQueue.becomeLeader(); + } + + @Override + public void stop() + { + segmentAllocationQueue.stopBeingLeader(); + } + } + ); leaderLifecycle.addHandler( new Lifecycle.Handler() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java index 96f14759f78a..3f117b2bac1e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskLockConfig.java @@ -30,8 +30,24 @@ public class TaskLockConfig @JsonProperty private boolean forceTimeChunkLock = true; + @JsonProperty + private boolean batchSegmentAllocation = true; + + @JsonProperty + private long batchAllocationMaxWaitTime = 50L; + public boolean isForceTimeChunkLock() { return forceTimeChunkLock; } + + public boolean isBatchSegmentAllocation() + { + return batchSegmentAllocation; + } + + public long getBatchAllocationMaxWaitTime() + { + return batchAllocationMaxWaitTime; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionBuilder.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionBuilder.java new file mode 100644 index 000000000000..2e9cd70bbdbc --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionBuilder.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.timeline.partition.PartialShardSpec; +import org.joda.time.DateTime; + +public class SegmentAllocateActionBuilder +{ + private String dataSource; + private DateTime timestamp; + private Granularity queryGranularity; + private Granularity preferredSegmentGranularity; + private String sequenceName; + private String previousSegmentId; + private boolean skipSegmentLineageCheck; + private PartialShardSpec partialShardSpec; + private LockGranularity lockGranularity; + private TaskLockType taskLockType; + private Task task; + + public SegmentAllocateActionBuilder forDatasource(String dataSource) + { + this.dataSource = dataSource; + return this; + } + + public SegmentAllocateActionBuilder forTimestamp(DateTime timestamp) + { + this.timestamp = timestamp; + return this; + } + + public SegmentAllocateActionBuilder forTimestamp(String instant) + { + this.timestamp = DateTimes.of(instant); + return this; + } + + public SegmentAllocateActionBuilder withQueryGranularity(Granularity queryGranularity) + { + this.queryGranularity = queryGranularity; + return this; + } + + public SegmentAllocateActionBuilder withSegmentGranularity(Granularity segmentGranularity) + { + this.preferredSegmentGranularity = segmentGranularity; + return this; + } + + public SegmentAllocateActionBuilder withSequenceName(String sequenceName) + { + this.sequenceName = sequenceName; + return this; + } + + public SegmentAllocateActionBuilder withPreviousSegmentId(String previousSegmentId) + { + this.previousSegmentId = previousSegmentId; + return this; + } + + public SegmentAllocateActionBuilder withSkipLineageCheck(boolean skipLineageCheck) + { + this.skipSegmentLineageCheck = skipLineageCheck; + return this; + } + + public SegmentAllocateActionBuilder withPartialShardSpec(PartialShardSpec partialShardSpec) + { + this.partialShardSpec = partialShardSpec; + return this; + } + + public SegmentAllocateActionBuilder withLockGranularity(LockGranularity lockGranularity) + { + this.lockGranularity = lockGranularity; + return this; + } + + public SegmentAllocateActionBuilder withTaskLockType(TaskLockType taskLockType) + { + this.taskLockType = taskLockType; + return this; + } + + public SegmentAllocateActionBuilder forTask(Task task) + { + this.dataSource = task.getDataSource(); + this.sequenceName = task.getId(); + this.task = task; + return this; + } + + public SegmentAllocateRequest build() + { + return new SegmentAllocateRequest(task, buildAction(), 1); + } + + public SegmentAllocateAction buildAction() + { + return new SegmentAllocateAction( + dataSource, + timestamp, + queryGranularity, + preferredSegmentGranularity, + sequenceName, + previousSegmentId, + skipSegmentLineageCheck, + partialShardSpec, + lockGranularity, + taskLockType + ); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index a7e85a027e61..c8861a92cd3f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.common.actions; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -51,6 +50,7 @@ import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Period; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -64,6 +64,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @RunWith(Parameterized.class) @@ -79,20 +81,26 @@ public class SegmentAllocateActionTest private static final DateTime PARTY_TIME = DateTimes.of("1999"); private static final DateTime THE_DISTANT_FUTURE = DateTimes.of("3000"); + private final boolean useBatch; private final LockGranularity lockGranularity; - @Parameterized.Parameters(name = "{0}") + private SegmentAllocationQueue allocationQueue; + + @Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}") public static Iterable constructorFeeder() { return ImmutableList.of( - new Object[]{LockGranularity.SEGMENT}, - new Object[]{LockGranularity.TIME_CHUNK} + new Object[]{LockGranularity.SEGMENT, true}, + new Object[]{LockGranularity.SEGMENT, false}, + new Object[]{LockGranularity.TIME_CHUNK, true}, + new Object[]{LockGranularity.TIME_CHUNK, false} ); } - public SegmentAllocateActionTest(LockGranularity lockGranularity) + public SegmentAllocateActionTest(LockGranularity lockGranularity, boolean useBatch) { this.lockGranularity = lockGranularity; + this.useBatch = useBatch; } @Before @@ -101,6 +109,19 @@ public void setUp() ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); EmittingLogger.registerEmitter(emitter); EasyMock.replay(emitter); + allocationQueue = taskActionTestKit.getTaskActionToolbox().getSegmentAllocationQueue(); + if (allocationQueue != null) { + allocationQueue.start(); + allocationQueue.becomeLeader(); + } + } + + @After + public void tearDown() + { + if (allocationQueue != null) { + allocationQueue.stop(); + } } @Test @@ -288,29 +309,11 @@ public void testResumeSequence() if (lockGranularity == LockGranularity.TIME_CHUNK) { final TaskLock partyLock = Iterables.getOnlyElement( FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) - .filter( - new Predicate() - { - @Override - public boolean apply(TaskLock input) - { - return input.getInterval().contains(PARTY_TIME); - } - } - ) + .filter(input -> input.getInterval().contains(PARTY_TIME)) ); final TaskLock futureLock = Iterables.getOnlyElement( FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) - .filter( - new Predicate() - { - @Override - public boolean apply(TaskLock input) - { - return input.getInterval().contains(THE_DISTANT_FUTURE); - } - } - ) + .filter(input -> input.getInterval().contains(THE_DISTANT_FUTURE)) ); assertSameIdentifier( @@ -446,29 +449,11 @@ public void testMultipleSequences() if (lockGranularity == LockGranularity.TIME_CHUNK) { final TaskLock partyLock = Iterables.getOnlyElement( FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) - .filter( - new Predicate() - { - @Override - public boolean apply(TaskLock input) - { - return input.getInterval().contains(PARTY_TIME); - } - } - ) + .filter(input -> input.getInterval().contains(PARTY_TIME)) ); final TaskLock futureLock = Iterables.getOnlyElement( FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task)) - .filter( - new Predicate() - { - @Override - public boolean apply(TaskLock input) - { - return input.getInterval().contains(THE_DISTANT_FUTURE); - } - } - ) + .filter(input -> input.getInterval().contains(THE_DISTANT_FUTURE)) ); assertSameIdentifier( @@ -990,21 +975,26 @@ private SegmentIdWithShardSpec allocate( lockGranularity, null ); - return action.perform(task, taskActionTestKit.getTaskActionToolbox()); + + try { + if (useBatch) { + return action.performAsync(task, taskActionTestKit.getTaskActionToolbox()) + .get(5, TimeUnit.SECONDS); + } else { + return action.perform(task, taskActionTestKit.getTaskActionToolbox()); + } + } + catch (ExecutionException e) { + return null; + } + catch (Exception e) { + throw new RuntimeException(e); + } } private void assertSameIdentifier(final SegmentIdWithShardSpec expected, final SegmentIdWithShardSpec actual) { Assert.assertEquals(expected, actual); - Assert.assertEquals(expected.getShardSpec().getPartitionNum(), actual.getShardSpec().getPartitionNum()); - Assert.assertEquals(expected.getShardSpec().getClass(), actual.getShardSpec().getClass()); - - if (expected.getShardSpec().getClass() == NumberedShardSpec.class - && actual.getShardSpec().getClass() == NumberedShardSpec.class) { - Assert.assertEquals(expected.getShardSpec().getNumCorePartitions(), actual.getShardSpec().getNumCorePartitions()); - } else if (expected.getShardSpec().getClass() == LinearShardSpec.class - && actual.getShardSpec().getClass() == LinearShardSpec.class) { - // do nothing - } + Assert.assertEquals(expected.getShardSpec(), actual.getShardSpec()); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java new file mode 100644 index 000000000000..536e9ffac24c --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueueTest.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.server.coordinator.simulate.BlockingExecutorService; +import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class SegmentAllocationQueueTest +{ + @Rule + public TaskActionTestKit taskActionTestKit = new TaskActionTestKit(); + + private static final String DS_WIKI = "wiki"; + private static final String DS_KOALA = "koala"; + + private SegmentAllocationQueue allocationQueue; + + private StubServiceEmitter emitter; + private BlockingExecutorService executor; + + @Before + public void setUp() + { + executor = new BlockingExecutorService("alloc-test-exec"); + emitter = new StubServiceEmitter("overlord", "alloc-test"); + + final TaskLockConfig lockConfig = new TaskLockConfig() + { + @Override + public boolean isBatchSegmentAllocation() + { + return true; + } + + @Override + public long getBatchAllocationMaxWaitTime() + { + return 0; + } + }; + + allocationQueue = new SegmentAllocationQueue( + taskActionTestKit.getTaskLockbox(), + lockConfig, + taskActionTestKit.getMetadataStorageCoordinator(), + emitter, + (corePoolSize, nameFormat) + -> new WrappingScheduledExecutorService(nameFormat, executor, false) + ); + allocationQueue.start(); + allocationQueue.becomeLeader(); + } + + @After + public void tearDown() + { + if (allocationQueue != null) { + allocationQueue.stop(); + } + if (executor != null) { + executor.shutdownNow(); + } + emitter.flush(); + } + + @Test + public void testBatchWithMultipleTimestamps() + { + verifyAllocationWithBatching( + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .forTimestamp("2022-01-01T01:00:00") + .withSegmentGranularity(Granularities.DAY) + .withQueryGranularity(Granularities.SECOND) + .withLockGranularity(LockGranularity.TIME_CHUNK) + .withSequenceName("seq_1") + .build(), + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .forTimestamp("2022-01-01T02:00:00") + .withSegmentGranularity(Granularities.DAY) + .withQueryGranularity(Granularities.SECOND) + .withLockGranularity(LockGranularity.TIME_CHUNK) + .withSequenceName("seq_2") + .build(), + true + ); + } + + @Test + public void testBatchWithExclusiveLocks() + { + verifyAllocationWithBatching( + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withTaskLockType(TaskLockType.EXCLUSIVE).build(), + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withTaskLockType(TaskLockType.EXCLUSIVE).build(), + true + ); + } + + @Test + public void testBatchWithSharedLocks() + { + verifyAllocationWithBatching( + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withTaskLockType(TaskLockType.SHARED).build(), + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withTaskLockType(TaskLockType.SHARED).build(), + true + ); + } + + @Test + public void testBatchWithMultipleQueryGranularities() + { + verifyAllocationWithBatching( + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withQueryGranularity(Granularities.SECOND).build(), + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withQueryGranularity(Granularities.MINUTE).build(), + true + ); + } + + @Test + public void testMultipleDatasourcesCannotBatch() + { + verifyAllocationWithBatching( + allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build(), + allocateRequest().forTask(createTask(DS_KOALA, "group_1")).build(), + false + ); + } + + @Test + public void testMultipleGroupIdsCannotBatch() + { + verifyAllocationWithBatching( + allocateRequest().forTask(createTask(DS_WIKI, "group_2")).build(), + allocateRequest().forTask(createTask(DS_WIKI, "group_3")).build(), + false + ); + } + + @Test + public void testMultipleLockGranularitiesCannotBatch() + { + verifyAllocationWithBatching( + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withLockGranularity(LockGranularity.TIME_CHUNK).build(), + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withLockGranularity(LockGranularity.SEGMENT).build(), + false + ); + } + + @Test + public void testMultipleAllocateIntervalsCannotBatch() + { + verifyAllocationWithBatching( + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .forTimestamp("2022-01-01") + .withSegmentGranularity(Granularities.DAY).build(), + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .forTimestamp("2022-01-02") + .withSegmentGranularity(Granularities.DAY).build(), + false + ); + } + + @Test + public void testConflictingPendingSegment() + { + SegmentAllocateRequest hourSegmentRequest = + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withSegmentGranularity(Granularities.HOUR) + .build(); + Future hourSegmentFuture = allocationQueue.add(hourSegmentRequest); + + SegmentAllocateRequest halfHourSegmentRequest = + allocateRequest().forTask(createTask(DS_WIKI, "group_1")) + .withSegmentGranularity(Granularities.THIRTY_MINUTE) + .build(); + Future halfHourSegmentFuture = allocationQueue.add(halfHourSegmentRequest); + + executor.finishNextPendingTask(); + + Assert.assertNotNull(getSegmentId(hourSegmentFuture)); + Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(halfHourSegmentFuture)); + Assert.assertEquals("Storage coordinator could not allocate segment.", t.getMessage()); + } + + @Test + public void testFullAllocationQueue() + { + for (int i = 0; i < 2000; ++i) { + SegmentAllocateRequest request = + allocateRequest().forTask(createTask(DS_WIKI, "group_" + i)).build(); + allocationQueue.add(request); + } + + SegmentAllocateRequest request = + allocateRequest().forTask(createTask(DS_WIKI, "next_group")).build(); + Future future = allocationQueue.add(request); + + // Verify that the future is already complete and segment allocation has failed + Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future)); + Assert.assertEquals( + "Segment allocation queue is full. Check the metric `task/action/batch/runTime` " + + "to determine if metadata operations are slow.", + t.getMessage() + ); + } + + @Test + public void testMultipleRequestsForSameSegment() + { + final List> segmentFutures = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + SegmentAllocateRequest request = + allocateRequest().forTask(createTask(DS_WIKI, "group_" + i)) + .withSequenceName("sequence_1") + .withPreviousSegmentId("segment_1") + .build(); + segmentFutures.add(allocationQueue.add(request)); + } + + executor.finishNextPendingTask(); + + SegmentIdWithShardSpec segmentId1 = getSegmentId(segmentFutures.get(0)); + + for (Future future : segmentFutures) { + Assert.assertEquals(getSegmentId(future), segmentId1); + } + } + + @Test + public void testMaxWaitTime() + { + // Verify that the batch is due yet + } + + @Test + public void testRequestsFailOnLeaderChange() + { + final List> segmentFutures = new ArrayList<>(); + for (int i = 0; i < 10; ++i) { + SegmentAllocateRequest request = + allocateRequest().forTask(createTask(DS_WIKI, "group_" + i)).build(); + segmentFutures.add(allocationQueue.add(request)); + } + + allocationQueue.stopBeingLeader(); + executor.finishNextPendingTask(); + + for (Future future : segmentFutures) { + Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future)); + Assert.assertEquals("Cannot allocate segment if not leader", t.getMessage()); + } + } + + private void verifyAllocationWithBatching( + SegmentAllocateRequest a, + SegmentAllocateRequest b, + boolean canBatch + ) + { + Assert.assertEquals(0, allocationQueue.size()); + final Future futureA = allocationQueue.add(a); + final Future futureB = allocationQueue.add(b); + + final int expectedCount = canBatch ? 1 : 2; + Assert.assertEquals(expectedCount, allocationQueue.size()); + + executor.finishNextPendingTask(); + emitter.verifyEmitted("task/action/batch/size", expectedCount); + + Assert.assertNotNull(getSegmentId(futureA)); + Assert.assertNotNull(getSegmentId(futureB)); + } + + private SegmentIdWithShardSpec getSegmentId(Future future) + { + try { + return future.get(5, TimeUnit.SECONDS); + } + catch (ExecutionException e) { + throw new ISE(e.getCause().getMessage()); + } + catch (InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + private SegmentAllocateActionBuilder allocateRequest() + { + return new SegmentAllocateActionBuilder() + .forDatasource(DS_WIKI) + .forTimestamp("2022-01-01") + .withLockGranularity(LockGranularity.TIME_CHUNK) + .withTaskLockType(TaskLockType.SHARED) + .withQueryGranularity(Granularities.SECOND) + .withSegmentGranularity(Granularities.HOUR); + } + + private Task createTask(String datasource, String groupId) + { + Task task = new NoopTask(null, groupId, datasource, 0, 0, null, null, null); + taskActionTestKit.getTaskLockbox().add(task); + return task; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 2d6b22732a69..189d3fe8779c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -27,7 +27,10 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; +import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig; @@ -99,11 +102,34 @@ public int getSqlMetadataMaxRetry() Suppliers.ofInstance(metadataStorageTablesConfig), testDerbyConnector ); + final ServiceEmitter noopEmitter = new NoopServiceEmitter(); + final TaskLockConfig taskLockConfig = new TaskLockConfig() + { + @Override + public boolean isBatchSegmentAllocation() + { + return true; + } + + @Override + public long getBatchAllocationMaxWaitTime() + { + return 10L; + } + }; + taskActionToolbox = new TaskActionToolbox( taskLockbox, taskStorage, metadataStorageCoordinator, - new NoopServiceEmitter(), + new SegmentAllocationQueue( + taskLockbox, + taskLockConfig, + metadataStorageCoordinator, + noopEmitter, + ScheduledExecutors::fixed + ), + noopEmitter, EasyMock.createMock(SupervisorManager.class), objectMapper ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 6f9a01c25b0b..3ada645ff88a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -40,6 +40,7 @@ import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TimeChunkLock; +import org.apache.druid.indexing.common.actions.SegmentAllocationQueue; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.NoopTask; @@ -226,7 +227,8 @@ public MockTaskRunner get() serviceEmitter, supervisorManager, EasyMock.createNiceMock(OverlordHelperManager.class), - new TestDruidLeaderSelector() + new TestDruidLeaderSelector(), + EasyMock.createNiceMock(SegmentAllocationQueue.class) ); EmittingLogger.registerEmitter(serviceEmitter); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 2dcff7694678..d64bd1d22263 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -25,6 +25,7 @@ import com.google.common.collect.Sets; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.SegmentCreateRequest; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.jackson.DefaultObjectMapper; @@ -36,8 +37,10 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator @@ -124,6 +127,17 @@ public Set announceHistoricalSegments(Set segments) return ImmutableSet.copyOf(added); } + @Override + public Map allocatePendingSegments( + String dataSource, + Interval interval, + boolean skipSegmentLineageCheck, + List requests + ) + { + return Collections.emptyMap(); + } + @Override public SegmentPublishResult announceHistoricalSegments( Set segments, diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index acb617f78a4c..b3c70f0cdbe9 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -31,6 +31,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; /** @@ -154,6 +155,27 @@ Collection retrieveUsedSegmentsForIntervals( */ Set announceHistoricalSegments(Set segments) throws IOException; + /** + * Allocates pending segments for the given requests in the pending segments table. + * The segment id allocated for a request will not be given out again unless a + * request is made with the same {@link SegmentCreateRequest}. + * + * @param dataSource dataSource for which to allocate a segment + * @param interval interval for which to allocate a segment + * @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence. + * Should be set to false if replica tasks would index events in same order + * @param requests Requests for which to allocate segments. All + * the requests must share the same partition space. + * @return Map from request to allocated segment id. The map does not contain + * entries for failed requests. + */ + Map allocatePendingSegments( + String dataSource, + Interval interval, + boolean skipSegmentLineageCheck, + List requests + ); + /** * Allocate a new pending segment in the pending segments table. This segment identifier will never be given out * again, unless another call is made with the same dataSource, sequenceName, and previousSegmentId. diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java new file mode 100644 index 000000000000..b43e46d8e7a5 --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import org.apache.druid.timeline.partition.PartialShardSpec; + +/** + * Contains information used by {@link IndexerMetadataStorageCoordinator} for + * creating a new segment. + *

+ * The {@code sequenceName} and {@code previousSegmentId} fields are meant to + * make it easy for two independent ingestion tasks to produce the same series + * of segments. + */ +public class SegmentCreateRequest +{ + // DO NOT IMPLEMENT equals or hashCode for this class as each request must be + // treated as unique even if it is for the same parameters + + private final String version; + private final String sequenceName; + private final String previousSegmentId; + private final PartialShardSpec partialShardSpec; + + public SegmentCreateRequest( + String sequenceName, + String previousSegmentId, + String version, + PartialShardSpec partialShardSpec + ) + { + this.sequenceName = sequenceName; + this.previousSegmentId = previousSegmentId == null ? "" : previousSegmentId; + this.version = version; + this.partialShardSpec = partialShardSpec; + } + + public String getSequenceName() + { + return sequenceName; + } + + /** + * Non-null previous segment id. This can be used for persisting to the + * pending segments table in the metadata store. + */ + public String getPreviousSegmentId() + { + return previousSegmentId; + } + + public String getVersion() + { + return version; + } + + public PartialShardSpec getPartialShardSpec() + { + return partialShardSpec; + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index cc42d77e1b70..bb7759a8b5d6 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -28,12 +28,14 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; import com.google.inject.Inject; import org.apache.commons.lang.StringEscapeUtils; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.SegmentCreateRequest; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; @@ -75,10 +77,13 @@ import javax.validation.constraints.NotNull; import java.io.IOException; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -221,6 +226,10 @@ public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interv return numSegmentsMarkedUnused; } + /** + * Fetches all the pending segments, whose interval overlaps with the given + * search interval from the metadata store. + */ private Set getPendingSegmentsForIntervalWithHandle( final Handle handle, final String dataSource, @@ -481,6 +490,23 @@ public int getSqlMetadataMaxRetry() return SQLMetadataConnector.DEFAULT_MAX_TRIES; } + @Override + public Map allocatePendingSegments( + String dataSource, + Interval allocateInterval, + boolean skipSegmentLineageCheck, + List requests + ) + { + Preconditions.checkNotNull(dataSource, "dataSource"); + Preconditions.checkNotNull(allocateInterval, "interval"); + + final Interval interval = allocateInterval.withChronology(ISOChronology.getInstanceUTC()); + return connector.retryWithHandle( + handle -> allocatePendingSegments(handle, dataSource, interval, skipSegmentLineageCheck, requests) + ); + } + @Override public SegmentIdWithShardSpec allocatePendingSegment( final String dataSource, @@ -599,6 +625,81 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( return newIdentifier; } + private Map allocatePendingSegments( + final Handle handle, + final String dataSource, + final Interval interval, + final boolean skipSegmentLineageCheck, + final List requests + ) throws IOException + { + final Map existingSegmentIds; + if (skipSegmentLineageCheck) { + existingSegmentIds = getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, requests); + } else { + existingSegmentIds = getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, requests); + } + + // For every request see if a segment id already exists + final Map allocatedSegmentIds = new HashMap<>(); + final List requestsForNewSegments = new ArrayList<>(); + for (SegmentCreateRequest request : requests) { + CheckExistingSegmentIdResult existingSegmentId = existingSegmentIds.get(request); + if (existingSegmentId == null || !existingSegmentId.found) { + requestsForNewSegments.add(request); + } else if (existingSegmentId.segmentIdentifier != null) { + log.info("Found valid existing segment [%s] for request.", existingSegmentId.segmentIdentifier); + allocatedSegmentIds.put(request, existingSegmentId.segmentIdentifier); + } else { + log.info("Found clashing existing segment [%s] for request.", existingSegmentId); + } + } + + // For each of the remaining requests, create a new segment + final Map createdSegments = + createNewSegments(handle, dataSource, interval, skipSegmentLineageCheck, requestsForNewSegments); + + // SELECT -> INSERT can fail due to races; callers must be prepared to retry. + // Avoiding ON DUPLICATE KEY since it's not portable. + // Avoiding try/catch since it may cause inadvertent transaction-splitting. + + // UNIQUE key for the row, ensuring we don't have more than one segment per sequence per interval. + // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines + // have difficulty with large unique keys (see https://github.com/apache/druid/issues/2319) + insertPendingSegmentsIntoMetastore( + handle, + createdSegments, + dataSource, + interval, + skipSegmentLineageCheck + ); + + allocatedSegmentIds.putAll(createdSegments); + return allocatedSegmentIds; + } + + @SuppressWarnings("UnstableApiUsage") + private String getSequenceNameAndPrevIdSha( + SegmentCreateRequest request, + Interval interval, + boolean skipSegmentLineageCheck + ) + { + final Hasher hasher = Hashing.sha1().newHasher() + .putBytes(StringUtils.toUtf8(request.getSequenceName())) + .putByte((byte) 0xff); + if (skipSegmentLineageCheck) { + hasher + .putLong(interval.getStartMillis()) + .putLong(interval.getEndMillis()); + } else { + hasher + .putBytes(StringUtils.toUtf8(request.getPreviousSegmentId())); + } + + return BaseEncoding.base16().encode(hasher.hash().asBytes()); + } + @Nullable private SegmentIdWithShardSpec allocatePendingSegment( final Handle handle, @@ -631,7 +732,6 @@ private SegmentIdWithShardSpec allocatePendingSegment( ); if (result.found) { - // The found existing segment identifier can be null if its interval doesn't match with the given interval return result.segmentIdentifier; } @@ -672,6 +772,95 @@ private SegmentIdWithShardSpec allocatePendingSegment( return newIdentifier; } + /** + * Returns a map from sequenceName to segment id. + */ + private Map getExistingSegmentIdsSkipLineageCheck( + Handle handle, + String dataSource, + Interval interval, + List requests + ) throws IOException + { + final Query> query = handle + .createQuery( + StringUtils.format( + "SELECT sequence_name, payload " + + "FROM %s WHERE " + + "dataSource = :dataSource AND " + + "start = :start AND " + + "%2$send%2$s = :end", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + ) + ) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + + final ResultIterator dbSegments = query + .map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)) + .iterator(); + + // Map from sequenceName to segment id + final Map sequenceToSegmentId = new HashMap<>(); + while (dbSegments.hasNext()) { + final PendingSegmentsRecord record = dbSegments.next(); + final SegmentIdWithShardSpec segmentId = + jsonMapper.readValue(record.getPayload(), SegmentIdWithShardSpec.class); + sequenceToSegmentId.put(record.getSequenceName(), segmentId); + } + + final Map requestToResult = new HashMap<>(); + for (SegmentCreateRequest request : requests) { + SegmentIdWithShardSpec segmentId = sequenceToSegmentId.get(request.getSequenceName()); + requestToResult.put(request, new CheckExistingSegmentIdResult(segmentId != null, segmentId)); + } + + return requestToResult; + } + + /** + * Returns a map from sequenceName to segment id. + */ + private Map getExistingSegmentIdsWithLineageCheck( + Handle handle, + String dataSource, + Interval interval, + List requests + ) throws IOException + { + // This cannot be batched because there doesn't seem to be a clean option: + // 1. WHERE must have sequence_name and sequence_prev_id but not start or end. + // (sequence columns are used to find the matching segment whereas start and + // end are used to determine if the found segment is valid or not) + // 2. IN filters on sequence_name and sequence_prev_id might perform worse than individual SELECTs? + // 3. IN filter on sequence_name alone might be a feasible option worth evaluating + final String sql = StringUtils.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable() + ); + + final Map requestToResult = new HashMap<>(); + for (SegmentCreateRequest request : requests) { + CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( + handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("sequence_name", request.getSequenceName()) + .bind("sequence_prev_id", request.getPreviousSegmentId()), + interval, + request.getSequenceName(), + request.getPreviousSegmentId() + ); + requestToResult.put(request, result); + } + + return requestToResult; + } + private CheckExistingSegmentIdResult checkAndGetExistingSegmentId( final Query> query, final Interval interval, @@ -686,50 +875,36 @@ private CheckExistingSegmentIdResult checkAndGetExistingSegmentId( } final List existingBytes = boundQuery.map(ByteArrayMapper.FIRST).list(); - if (!existingBytes.isEmpty()) { + if (existingBytes.isEmpty()) { + return new CheckExistingSegmentIdResult(false, null); + } else { final SegmentIdWithShardSpec existingIdentifier = jsonMapper.readValue( Iterables.getOnlyElement(existingBytes), SegmentIdWithShardSpec.class ); - if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() - && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { - if (previousSegmentId == null) { - log.info("Found existing pending segment [%s] for sequence[%s] in DB", existingIdentifier, sequenceName); - } else { - log.info( - "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - existingIdentifier, - sequenceName, - previousSegmentId - ); - } + if (existingIdentifier.getInterval().isEqual(interval)) { + log.info( + "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + existingIdentifier, + sequenceName, + previousSegmentId + ); return new CheckExistingSegmentIdResult(true, existingIdentifier); } else { - if (previousSegmentId == null) { - log.warn( - "Cannot use existing pending segment [%s] for sequence[%s] in DB, " - + "does not match requested interval[%s]", - existingIdentifier, - sequenceName, - interval - ); - } else { - log.warn( - "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " - + "does not match requested interval[%s]", - existingIdentifier, - sequenceName, - previousSegmentId, - interval - ); - } + log.warn( + "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " + + "does not match requested interval[%s]", + existingIdentifier, + sequenceName, + previousSegmentId, + interval + ); return new CheckExistingSegmentIdResult(true, null); } } - return new CheckExistingSegmentIdResult(false, null); } private static class CheckExistingSegmentIdResult @@ -745,6 +920,48 @@ private static class CheckExistingSegmentIdResult } } + private void insertPendingSegmentsIntoMetastore( + Handle handle, + Map createdSegments, + String dataSource, + Interval interval, + boolean skipSegmentLineageCheck + ) throws JsonProcessingException + { + final PreparedBatch insertBatch = handle.prepareBatch( + StringUtils.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " + + "sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " + + ":sequence_name_prev_id_sha1, :payload)", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + )); + + // Deduplicate the segment ids by inverting the map + Map segmentIdToRequest = new HashMap<>(); + createdSegments.forEach((request, segmentId) -> segmentIdToRequest.put(segmentId, request)); + + for (Map.Entry entry : segmentIdToRequest.entrySet()) { + final SegmentCreateRequest request = entry.getValue(); + final SegmentIdWithShardSpec segmentId = entry.getKey(); + insertBatch.add() + .bind("id", segmentId.toString()) + .bind("dataSource", dataSource) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .bind("sequence_name", request.getSequenceName()) + .bind("sequence_prev_id", request.getPreviousSegmentId()) + .bind( + "sequence_name_prev_id_sha1", + getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck) + ) + .bind("payload", jsonMapper.writeValueAsBytes(segmentId)); + } + insertBatch.execute(); + } + private void insertPendingSegmentIntoMetastore( Handle handle, SegmentIdWithShardSpec newIdentifier, @@ -777,6 +994,204 @@ private void insertPendingSegmentIntoMetastore( .execute(); } + private Map createNewSegments( + Handle handle, + String dataSource, + Interval interval, + boolean skipSegmentLineageCheck, + List requests + ) throws IOException + { + if (requests.isEmpty()) { + return Collections.emptyMap(); + } + + // Get the time chunk and associated data segments for the given interval, if any + final List> existingChunks = + getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) + .lookup(interval); + + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segments for dataSource[%s], interval[%s]: already have [%,d] chunks.", + dataSource, + interval, + existingChunks.size() + ); + return Collections.emptyMap(); + } + + // Shard spec of any of the requests (as they are all compatible) can be used to + // identify existing shard specs that share partition space with the requested ones. + final PartialShardSpec partialShardSpec = requests.get(0).getPartialShardSpec(); + + // max partitionId of published data segments which share the same partition space. + SegmentIdWithShardSpec committedMaxId = null; + + @Nullable + final String versionOfExistingChunk; + if (existingChunks.isEmpty()) { + versionOfExistingChunk = null; + } else { + TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + versionOfExistingChunk = existingHolder.getVersion(); + + // Don't use the stream API for performance. + for (DataSegment segment : FluentIterable + .from(existingHolder.getObject()) + .transform(PartitionChunk::getObject) + // Here we check only the segments of the shardSpec which shares the same partition space with the given + // partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others. + // See PartitionIds. + .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) { + if (committedMaxId == null + || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { + committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment); + } + } + } + + + // Fetch the pending segments for this interval to determine max partitionId + // across all shard specs (published + pending). + // A pending segment having a higher partitionId must also be considered + // to avoid clashes when inserting the pending segment created here. + final Set pendingSegments = + getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval); + + final Map createdSegments = new HashMap<>(); + final Map sequenceHashToSegment = new HashMap<>(); + + for (SegmentCreateRequest request : requests) { + // Check if the required segment has already been created in this batch + final String sequenceHash = getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck); + + final SegmentIdWithShardSpec createdSegment; + if (sequenceHashToSegment.containsKey(sequenceHash)) { + createdSegment = sequenceHashToSegment.get(sequenceHash); + } else { + createdSegment = createNewSegment( + request, + dataSource, + interval, + versionOfExistingChunk, + committedMaxId, + pendingSegments + ); + + // Add to pendingSegments to consider for partitionId + if (createdSegment != null) { + pendingSegments.add(createdSegment); + sequenceHashToSegment.put(sequenceHash, createdSegment); + log.info("Created new segment [%s]", createdSegment); + } + } + + if (createdSegment != null) { + createdSegments.put(request, createdSegment); + } + } + + log.info("Created [%d] new segments for [%d] allocate requests.", sequenceHashToSegment.size(), requests.size()); + return createdSegments; + } + + private SegmentIdWithShardSpec createNewSegment( + SegmentCreateRequest request, + String dataSource, + Interval interval, + String versionOfExistingChunk, + SegmentIdWithShardSpec committedMaxId, + Set pendingSegments + ) + { + final PartialShardSpec partialShardSpec = request.getPartialShardSpec(); + final String existingVersion = request.getVersion(); + + // Include the committedMaxId while computing the overallMaxId + if (committedMaxId != null) { + pendingSegments.add(committedMaxId); + } + + // If there is an existing chunk, find the max id with the same version as the existing chunk. + // There may still be a pending segment with a higher version (but no corresponding used segments) + // which may generate a clash with an existing segment once the new id is generated + final SegmentIdWithShardSpec overallMaxId = + pendingSegments.stream() + .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)) + .filter(id -> versionOfExistingChunk == null + || id.getVersion().equals(versionOfExistingChunk)) + .max(Comparator.comparing(SegmentIdWithShardSpec::getVersion) + .thenComparing(id -> id.getShardSpec().getPartitionNum())) + .orElse(null); + + // Determine the version of the new segment + final String newSegmentVersion; + if (versionOfExistingChunk != null) { + newSegmentVersion = versionOfExistingChunk; + } else if (overallMaxId != null) { + newSegmentVersion = overallMaxId.getVersion(); + } else { + // this is the first segment for this interval + newSegmentVersion = null; + } + + if (overallMaxId == null) { + // When appending segments, null overallMaxId means that we are allocating the very initial + // segment for this time chunk. + // This code is executed when the Overlord coordinates segment allocation, which is either you append segments + // or you use segment lock. Since the core partitions set is not determined for appended segments, we set + // it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the + // OvershadowableManager handles the atomic segment update. + final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace() + ? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID + : PartitionIds.ROOT_GEN_START_PARTITION_ID; + + String version = newSegmentVersion == null ? existingVersion : newSegmentVersion; + return new SegmentIdWithShardSpec( + dataSource, + interval, + version, + partialShardSpec.complete(jsonMapper, newPartitionId, 0) + ); + } else if (!overallMaxId.getInterval().equals(interval) + || overallMaxId.getVersion().compareTo(existingVersion) > 0) { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", + dataSource, + interval, + existingVersion, + overallMaxId + ); + return null; + } else if (committedMaxId != null + && committedMaxId.getShardSpec().getNumCorePartitions() + == SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) { + log.warn( + "Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]", + committedMaxId, + committedMaxId.getShardSpec() + ); + return null; + } else { + // The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline. + // When the core partitions have been dropped, using pending segments may lead to an incorrect state + // where the chunk is believed to have core partitions and queries results are incorrect. + + return new SegmentIdWithShardSpec( + dataSource, + interval, + Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"), + partialShardSpec.complete( + jsonMapper, + overallMaxId.getShardSpec().getPartitionNum() + 1, + committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions() + ) + ); + } + } + /** * This function creates a new segment for the given datasource/interval/etc. A critical * aspect of the creation is to make sure that the new version & new partition number will make @@ -818,15 +1233,18 @@ private SegmentIdWithShardSpec createNewSegment( return null; } else { - // max partitionId of the committed shardSpecs which share the same partition space. + // max partitionId of published data segments which share the same partition space. SegmentIdWithShardSpec committedMaxId = null; - // max partitionId of the all shardSpecs including the pending ones which share the same partition space. - SegmentIdWithShardSpec overallMaxId; - if (!existingChunks.isEmpty()) { + @Nullable + final String versionOfExistingChunk; + if (existingChunks.isEmpty()) { + versionOfExistingChunk = null; + } else { TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + versionOfExistingChunk = existingHolder.getVersion(); - //noinspection ConstantConditions + // Don't use the stream API for performance. for (DataSegment segment : FluentIterable .from(existingHolder.getObject()) .transform(PartitionChunk::getObject) @@ -834,8 +1252,6 @@ private SegmentIdWithShardSpec createNewSegment( // partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others. // See PartitionIds. .filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) { - // Don't use the stream API for performance. - // Note that this will compute the max id of existing, visible, data segments in the time chunk: if (committedMaxId == null || committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) { committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment); @@ -843,63 +1259,41 @@ private SegmentIdWithShardSpec createNewSegment( } } - // Get the version of the existing chunk, we might need it in some of the cases below - // to compute the new identifier's version - @Nullable - final String versionOfExistingChunk; - if (!existingChunks.isEmpty()) { - // remember only one chunk possible for given interval so get the first & only one - versionOfExistingChunk = existingChunks.get(0).getVersion(); - } else { - versionOfExistingChunk = null; - } - - // next, we need to enrich the overallMaxId computed with committed segments with the information of the pending segments - // it is possible that a pending segment has a higher id in which case we need that, it will work, - // and it will avoid clashes when inserting the new pending segment later in the caller of this method + // Fetch the pending segments for this interval to determine max partitionId + // across all shard specs (published + pending). + // A pending segment having a higher partitionId must also be considered + // to avoid clashes when inserting the pending segment created here. final Set pendings = getPendingSegmentsForIntervalWithHandle( handle, dataSource, interval ); - // Make sure we add the committed max id we obtained from the segments table: if (committedMaxId != null) { pendings.add(committedMaxId); } - // Now compute the overallMaxId with all the information: pendings + segments: - // The versionOfExistingChunks filter is ensure that we pick the max id with the version of the existing chunk - // in the case that there may be a pending segment with a higher version but no corresponding used segments + + // If there is an existing chunk, find the max id with the same version as the existing chunk. + // There may still be a pending segment with a higher version (but no corresponding used segments) // which may generate a clash with an existing segment once the new id is generated + final SegmentIdWithShardSpec overallMaxId; overallMaxId = pendings.stream() .filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec)) - .filter(id -> versionOfExistingChunk == null || id.getVersion() - .equals(versionOfExistingChunk)) - .max((id1, id2) -> { - final int versionCompare = id1.getVersion().compareTo(id2.getVersion()); - if (versionCompare != 0) { - return versionCompare; - } else { - return Integer.compare( - id1.getShardSpec().getPartitionNum(), - id2.getShardSpec().getPartitionNum() - ); - } - }) + .filter(id -> versionOfExistingChunk == null + || id.getVersion().equals(versionOfExistingChunk)) + .max(Comparator.comparing(SegmentIdWithShardSpec::getVersion) + .thenComparing(id -> id.getShardSpec().getPartitionNum())) .orElse(null); - // The following code attempts to compute the new version, if this - // new version is not null at the end of next block then it will be - // used as the new version in the case for initial or appended segment + + // Determine the version of the new segment final String newSegmentVersion; if (versionOfExistingChunk != null) { - // segment version overrides, so pick that now that we know it exists newSegmentVersion = versionOfExistingChunk; - } else if (!pendings.isEmpty() && overallMaxId != null) { - // there is no visible segments in the time chunk, so pick the max id of pendings, as computed above + } else if (overallMaxId != null) { newSegmentVersion = overallMaxId.getVersion(); } else { - // no segments, no pendings, so this must be the very first segment created for this interval + // this is the first segment for this interval newSegmentVersion = null; } @@ -940,7 +1334,7 @@ private SegmentIdWithShardSpec createNewSegment( ); return null; } else { - // The number of core partitions must always be chosen from the set of used segments in the VersionedIntervalTimeline. + // The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline. // When the core partitions have been dropped, using pending segments may lead to an incorrect state // where the chunk is believed to have core partitions and queries results are incorrect. @@ -1472,4 +1866,46 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set + *

  • sequence_name
  • + *
  • payload
  • + * + */ + static PendingSegmentsRecord fromResultSet(ResultSet resultSet) + { + try { + return new PendingSegmentsRecord( + resultSet.getString(1), + resultSet.getBytes(2) + ); + } + catch (SQLException e) { + throw new RuntimeException(e); + } + } + + PendingSegmentsRecord(String sequenceName, byte[] payload) + { + this.payload = payload; + this.sequenceName = sequenceName; + } + + public byte[] getPayload() + { + return payload; + } + + public String getSequenceName() + { + return sequenceName; + } + } + } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java new file mode 100644 index 000000000000..33641a8417da --- /dev/null +++ b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import org.apache.druid.timeline.partition.NumberedPartialShardSpec; +import org.apache.druid.timeline.partition.PartialShardSpec; +import org.junit.Assert; +import org.junit.Test; + +public class SegmentCreateRequestTest +{ + + @Test + public void testNullPreviousSegmentId() + { + final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); + SegmentCreateRequest request = new SegmentCreateRequest( + "sequence", + null, + "version", + partialShardSpec + ); + Assert.assertEquals("sequence", request.getSequenceName()); + Assert.assertEquals("", request.getPreviousSegmentId()); + Assert.assertEquals("version", request.getVersion()); + Assert.assertEquals(partialShardSpec, request.getPartialShardSpec()); + } + +} diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index d7000f8f4d20..f7af9611e51a 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.StringTuple; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.ObjectMetadata; +import org.apache.druid.indexing.overlord.SegmentCreateRequest; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.java.util.common.DateTimes; @@ -59,7 +60,6 @@ import org.junit.rules.ExpectedException; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.PreparedBatch; -import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.StringMapper; import java.io.IOException; @@ -371,40 +371,12 @@ private void markAllSegmentsUnused(Set segments) Assert.assertEquals( 1, (int) derbyConnector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Integer withHandle(Handle handle) - { - String request = StringUtils.format( - "UPDATE %s SET used = false WHERE id = :id", - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() - ); - return handle.createStatement(request).bind("id", segment.getId().toString()).execute(); - } - } - ) - ); - } - } - - private void markAllSegmentsUsed(Set segments) - { - for (final DataSegment segment : segments) { - Assert.assertEquals( - 1, - (int) derbyConnector.getDBI().withHandle( - new HandleCallback() - { - @Override - public Integer withHandle(Handle handle) - { - String request = StringUtils.format( - "UPDATE %s SET used = true WHERE id = :id", - derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() - ); - return handle.createStatement(request).bind("id", segment.getId().toString()).execute(); - } + handle -> { + String request = StringUtils.format( + "UPDATE %s SET used = false WHERE id = :id", + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable() + ); + return handle.createStatement(request).bind("id", segment.getId().toString()).execute(); } ) ); @@ -415,32 +387,19 @@ private List retrievePendingSegmentIds() { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable(); return derbyConnector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) - { - return handle.createQuery("SELECT id FROM " + table + " ORDER BY id") - .map(StringMapper.FIRST) - .list(); - } - } + handle -> handle.createQuery("SELECT id FROM " + table + " ORDER BY id") + .map(StringMapper.FIRST) + .list() ); } + private List retrieveUsedSegmentIds() { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); return derbyConnector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) - { - return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id") - .map(StringMapper.FIRST) - .list(); - } - } + handle -> handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id") + .map(StringMapper.FIRST) + .list() ); } @@ -448,16 +407,9 @@ private List retrieveUnusedSegmentIds() { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); return derbyConnector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) - { - return handle.createQuery("SELECT id FROM " + table + " WHERE used = false ORDER BY id") - .map(StringMapper.FIRST) - .list(); - } - } + handle -> handle.createQuery("SELECT id FROM " + table + " WHERE used = false ORDER BY id") + .map(StringMapper.FIRST) + .list() ); } @@ -466,39 +418,34 @@ private Boolean insertUsedSegments(Set dataSegments) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(); return derbyConnector.retryWithHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws Exception - { - PreparedBatch preparedBatch = handle.prepareBatch( - StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - table, - derbyConnector.getQuoteString() - ) - ); - for (DataSegment segment : dataSegments) { - preparedBatch.add() - .bind("id", segment.getId().toString()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", DateTimes.nowUtc().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) - .bind("version", segment.getVersion()) - .bind("used", true) - .bind("payload", mapper.writeValueAsBytes(segment)); - } - - final int[] affectedRows = preparedBatch.execute(); - final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); - if (!succeeded) { - throw new ISE("Failed to publish segments to DB"); - } - return true; + handle -> { + PreparedBatch preparedBatch = handle.prepareBatch( + StringUtils.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + table, + derbyConnector.getQuoteString() + ) + ); + for (DataSegment segment : dataSegments) { + preparedBatch.add() + .bind("id", segment.getId().toString()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec)) + .bind("version", segment.getVersion()) + .bind("used", true) + .bind("payload", mapper.writeValueAsBytes(segment)); } + + final int[] affectedRows = preparedBatch.execute(); + final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1); + if (!succeeded) { + throw new ISE("Failed to publish segments to DB"); + } + return true; } ); } @@ -561,12 +508,12 @@ public void testAnnounceHistoricalSegments() throws IOException ); } - List segmentIds = segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList()); - segmentIds.sort(Comparator.naturalOrder()); - Assert.assertEquals( - segmentIds, - retrieveUsedSegmentIds() - ); + List segmentIds = segments.stream() + .map(segment -> segment.getId().toString()) + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toList()); + + Assert.assertEquals(segmentIds, retrieveUsedSegmentIds()); // Should not update dataSource metadata. Assert.assertEquals(0, metadataUpdateCounter.get()); @@ -823,10 +770,7 @@ public void testTransactionalAnnounceFailSegmentDropFailWithRetry() throws IOExc retrieveUsedSegmentIds() ); - DataSegment nonExistingSegment = defaultSegment4; - - Set dropSegments = ImmutableSet.of(existingSegment1, nonExistingSegment); - + Set dropSegments = ImmutableSet.of(existingSegment1, defaultSegment4); final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( SEGMENTS, dropSegments, @@ -1576,17 +1520,19 @@ public void testAllocatePendingSegment() } /** - * This test simulates an issue detected on the field consisting of the following sequence of events: - * - A kafka stream segment was created on a given interval - * - Later, after the above was published, another segment on same interval was created by the stream - * - Later, after the above was published, another segment on same interval was created by the stream - * - Later a compaction was issued for the three segments above - * - Later, after the above was published, another segment on same interval was created by the stream - * - Later, the compacted segment got dropped due to a drop rule + * This test verifies the behaviour in the following sequence of events: + * - create segment1 for an interval and publish + * - create segment2 for same interval and publish + * - create segment3 for same interval and publish + * - compact all segments above and publish new segments + * - create segment4 for the same interval + * - drop the compacted segment + * - create segment5 for the same interval + * - verify that the id for segment5 is correct * - Later, after the above was dropped, another segment on same interval was created by the stream but this - * time there was an integrity violation in the pending segments table because the - * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)} - * method returned an segment id that already existed in the pending segments table + * time there was an integrity violation in the pending segments table because the + * {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)} + * method returned an segment id that already existed in the pending segments table */ @Test public void testAllocatePendingSegmentAfterDroppingExistingSegment() @@ -1690,13 +1636,13 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() /** * Slightly different that the above test but that involves reverted compaction - 1) used segments of version = A, id = 0, 1, 2 - 2) overwrote segments of version = B, id = 0 <= compaction - 3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing - 4) pending segment of version = B, id = 1 <= appending new data, aborted - 5) reverted compaction, mark segments used for version = A, id = 0, 1, 2, and mark compacted segments unused - 6) used segments of version = A, id = 0, 1, 2 - 7) pending segment of version = B, id = 1 + * 1) used segments of version = A, id = 0, 1, 2 + * 2) overwrote segments of version = B, id = 0 <= compaction + * 3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing + * 4) pending segment of version = B, id = 1 <= appending new data, aborted + * 5) reverted compaction, mark segments used for version = A, id = 0, 1, 2, and mark compacted segments unused + * 6) used segments of version = A, id = 0, 1, 2 + * 7) pending segment of version = B, id = 1 */ @Test public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() @@ -1842,13 +1788,13 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() // used segment: version = A, id = 0,1,2 // unused segment: version = B, id = 0 List pendings = retrievePendingSegmentIds(); - Assert.assertTrue(pendings.size() == 4); + Assert.assertEquals(4, pendings.size()); List used = retrieveUsedSegmentIds(); - Assert.assertTrue(used.size() == 3); + Assert.assertEquals(3, used.size()); List unused = retrieveUnusedSegmentIds(); - Assert.assertTrue(unused.size() == 1); + Assert.assertEquals(1, unused.size()); // Simulate one more append load final SegmentIdWithShardSpec identifier4 = coordinator.allocatePendingSegment( @@ -1886,7 +1832,71 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3", ids.get(3)); } - + + @Test + public void testAllocatePendingSegments() + { + final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance(); + final String dataSource = "ds"; + final Interval interval = Intervals.of("2017-01-01/2017-02-01"); + final String sequenceName = "seq"; + + final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec); + final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request) + ).get(request); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); + + final SegmentCreateRequest request1 = + new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec); + final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request1) + ).get(request1); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); + + final SegmentCreateRequest request2 = + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec); + final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request2) + ).get(request2); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); + + final SegmentCreateRequest request3 = + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec); + final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request3) + ).get(request3); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString()); + Assert.assertEquals(segmentId2, segmentId3); + + final SegmentCreateRequest request4 = + new SegmentCreateRequest("seq1", null, "v1", partialShardSpec); + final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments( + dataSource, + interval, + false, + Collections.singletonList(request4) + ).get(request4); + + Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString()); + } + @Test public void testNoPendingSegmentsAndOneUsedSegment() { @@ -1923,9 +1933,8 @@ public void testNoPendingSegmentsAndOneUsedSegment() true ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString()); - - } + } @Test