diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 68ece19f87..efa6d885c2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -102,6 +102,7 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -355,6 +356,43 @@ public abstract class AbstractBatchIndexTask extends AbstractTask } } + + /** + * Attempts to acquire a lock that covers certain segments. + *

+ * Will look at {@link Tasks#FORCE_TIME_CHUNK_LOCK_KEY} to decide whether to acquire a time chunk or segment lock. + *

+ * This method will initialize {@link #taskLockHelper} as a side effect. + * + * @return whether the lock was acquired + */ + boolean determineLockGranularityAndTryLockWithSegments( + TaskActionClient client, + List segments, + BiConsumer> segmentCheckFunction + ) throws IOException + { + final boolean forceTimeChunkLock = getContextValue( + Tasks.FORCE_TIME_CHUNK_LOCK_KEY, + Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK + ); + + if (forceTimeChunkLock) { + log.info("[%s] is set to true in task context. Use timeChunk lock", Tasks.FORCE_TIME_CHUNK_LOCK_KEY); + taskLockHelper = createLockHelper(LockGranularity.TIME_CHUNK); + segmentCheckFunction.accept(LockGranularity.TIME_CHUNK, segments); + return tryTimeChunkLock( + client, + new ArrayList<>(segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet())) + ); + } else { + final LockGranularityDetermineResult result = determineSegmentGranularity(segments); + taskLockHelper = createLockHelper(result.lockGranularity); + segmentCheckFunction.accept(result.lockGranularity, segments); + return tryLockWithDetermineResult(client, result); + } + } + private LockGranularityDetermineResult determineSegmentGranularity(TaskActionClient client, List intervals) throws IOException { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 8f806d6269..d8b32c4125 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -445,7 +445,12 @@ public class CompactionTask extends AbstractBatchIndexTask implements PendingSeg @Override public boolean isReady(TaskActionClient taskActionClient) throws Exception { - return determineLockGranularityAndTryLock(taskActionClient, List.of(segmentProvider.interval)); +// return determineLockGranularityAndTryLock(taskActionClient, List.of(segmentProvider.interval)); + return determineLockGranularityAndTryLockWithSegments( + taskActionClient, + segmentProvider.findSegments(taskActionClient), + segmentProvider::checkSegments + ); } @Override diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java index 70aed40652..162daf3fbe 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQCompactionTaskRunTest.java @@ -133,7 +133,7 @@ public class MSQCompactionTaskRunTest extends CompactionTaskRunBase for (boolean useCentralizedDatasourceSchema : new boolean[]{false}) { for (boolean useSegmentMetadataCache : new boolean[]{false, true}) { for (boolean useConcurrentLocks : new boolean[]{false, true}) { - for (Interval inputInterval : new Interval[]{TEST_INTERVAL}) { + for (Interval inputInterval : new Interval[]{TEST_INTERVAL_DAY}) { for (Granularity segmentGran : new Granularity[]{Granularities.THREE_HOUR}) { String name = lockGranularity.name() + (useCentralizedDatasourceSchema ? "|centralizedSchema" : "")