diff --git a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java index c1fa71a5f319..4c32caf1f6bb 100644 --- a/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java +++ b/extensions-core/testing-tools/src/main/java/org/apache/druid/guice/ClusterTestingModule.java @@ -31,7 +31,7 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.rpc.indexing.OverlordClient; @@ -106,7 +106,7 @@ private void bindDependenciesForClusterTestingMode(Binder binder) } else if (roles.contains(NodeRole.OVERLORD)) { // If this is the Overlord, bind a faulty storage coordinator log.warn("Running Overlord in cluster testing mode."); - binder.bind(TaskLockbox.class) + binder.bind(GlobalTaskLockbox.class) .to(FaultyTaskLockbox.class) .in(LazySingleton.class); } diff --git a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java index d2639dcfbbab..67ce9e829ae6 100644 --- a/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java +++ b/extensions-core/testing-tools/src/main/java/org/apache/druid/testing/cluster/overlord/FaultyTaskLockbox.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; @@ -34,7 +35,7 @@ *
  • Skip cleanup of pending segments
  • * */ -public class FaultyTaskLockbox extends TaskLockbox +public class FaultyTaskLockbox extends GlobalTaskLockbox { private static final Logger log = new Logger(FaultyTaskLockbox.class); diff --git a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java index 39ff06b60350..569ad33e3ce3 100644 --- a/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java +++ b/extensions-core/testing-tools/src/test/java/org/apache/druid/guice/ClusterTestingModuleTest.java @@ -40,7 +40,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.indexing.input.DruidInputSource; -import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.filter.TrueDimFilter; import org.apache.druid.rpc.indexing.OverlordClient; @@ -244,7 +244,7 @@ public void test_overlordService_hasFaultyStorageCoordinator_ifTestingIsEnabled( final Injector overlordInjector = overlord.makeInjector(Set.of(NodeRole.OVERLORD)); - TaskLockbox taskLockbox = overlordInjector.getInstance(TaskLockbox.class); + GlobalTaskLockbox taskLockbox = overlordInjector.getInstance(GlobalTaskLockbox.class); Assert.assertTrue(taskLockbox instanceof FaultyTaskLockbox); } finally { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 724364c27413..dd92032c5886 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -24,8 +24,8 @@ import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -74,7 +74,7 @@ public class SegmentAllocationQueue private final long maxWaitTimeMillis; - private final TaskLockbox taskLockbox; + private final GlobalTaskLockbox taskLockbox; private final IndexerMetadataStorageCoordinator metadataStorage; private final AtomicBoolean isLeader = new AtomicBoolean(false); private final ServiceEmitter emitter; @@ -91,7 +91,7 @@ public class SegmentAllocationQueue @Inject public SegmentAllocationQueue( - TaskLockbox taskLockbox, + GlobalTaskLockbox taskLockbox, TaskLockConfig taskLockConfig, IndexerMetadataStorageCoordinator metadataStorage, ServiceEmitter emitter, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java index 7d001ecbcb9a..d430e3f5a9f2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionToolbox.java @@ -23,8 +23,8 @@ import com.google.common.base.Optional; import com.google.inject.Inject; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerFactory; import org.apache.druid.indexing.overlord.TaskStorage; @@ -33,7 +33,7 @@ public class TaskActionToolbox { - private final TaskLockbox taskLockbox; + private final GlobalTaskLockbox taskLockbox; private final TaskStorage taskStorage; private final SegmentAllocationQueue segmentAllocationQueue; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; @@ -44,7 +44,7 @@ public class TaskActionToolbox @Inject public TaskActionToolbox( - TaskLockbox taskLockbox, + GlobalTaskLockbox taskLockbox, TaskStorage taskStorage, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, SegmentAllocationQueue segmentAllocationQueue, @@ -63,7 +63,7 @@ public TaskActionToolbox( } public TaskActionToolbox( - TaskLockbox taskLockbox, + GlobalTaskLockbox taskLockbox, TaskStorage taskStorage, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, ServiceEmitter emitter, @@ -82,7 +82,7 @@ public TaskActionToolbox( ); } - public TaskLockbox getTaskLockbox() + public GlobalTaskLockbox getTaskLockbox() { return taskLockbox; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java index aeecba0e289d..b7c5d27f7052 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java @@ -28,7 +28,7 @@ import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.indexing.overlord.TaskLockbox; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.metadata.ReplaceTaskLock; @@ -52,7 +52,7 @@ public class TaskLocks { static void checkLockCoversSegments( final Task task, - final TaskLockbox taskLockbox, + final GlobalTaskLockbox taskLockbox, final Collection segments ) { @@ -69,7 +69,7 @@ static void checkLockCoversSegments( @VisibleForTesting static boolean isLockCoversSegments( final Task task, - final TaskLockbox taskLockbox, + final GlobalTaskLockbox taskLockbox, final Collection segments ) { @@ -176,7 +176,7 @@ public static TaskLockType determineLockTypeForAppend( */ public static Map findReplaceLocksCoveringSegments( final String datasource, - final TaskLockbox taskLockbox, + final GlobalTaskLockbox taskLockbox, final Set segments ) { @@ -206,7 +206,7 @@ public static Map findReplaceLocksCoveringSegments public static List findLocksForSegments( final Task task, - final TaskLockbox taskLockbox, + final GlobalTaskLockbox taskLockbox, final Collection segments ) { @@ -245,7 +245,7 @@ public static List findLocksForSegments( return found; } - private static NavigableMap> getTaskLockMap(TaskLockbox taskLockbox, Task task) + private static NavigableMap> getTaskLockMap(GlobalTaskLockbox taskLockbox, Task task) { final List taskLocks = taskLockbox.findLocksForTask(task); final NavigableMap> taskLockMap = new TreeMap<>(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/CriticalAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/CriticalAction.java index 9a860ea681e0..70aeb5abb918 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/CriticalAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/CriticalAction.java @@ -27,13 +27,13 @@ * This class represents a critical action must be done while the task's lock is guaranteed to not be revoked in the * middle of the action. * - * Implementations must not change the lock state by calling {@link TaskLockbox#lock)}, {@link TaskLockbox#tryLock)}, - * or {@link TaskLockbox#unlock(Task, Interval)}. + * Implementations must not change the lock state by calling {@link GlobalTaskLockbox#lock)}, {@link GlobalTaskLockbox#tryLock)}, + * or {@link GlobalTaskLockbox#unlock(Task, Interval)}. * - * Also, implementations should be finished as soon as possible because all methods in {@link TaskLockbox} are blocked + * Also, implementations should be finished as soon as possible because all methods in {@link GlobalTaskLockbox} are blocked * until this action is finished. * - * @see TaskLockbox#doInCriticalSection + * @see GlobalTaskLockbox#doInCriticalSection */ public class CriticalAction { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java index 36c8f36696b8..350f9b08cf91 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/DruidOverlord.java @@ -80,7 +80,7 @@ public DruidOverlord( final TaskLockConfig taskLockConfig, final TaskQueueConfig taskQueueConfig, final DefaultTaskConfig defaultTaskConfig, - final TaskLockbox taskLockbox, + final GlobalTaskLockbox taskLockbox, final TaskStorage taskStorage, final TaskActionClientFactory taskActionClientFactory, @Self final DruidNode selfNode, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java new file mode 100644 index 000000000000..a464af86868e --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/GlobalTaskLockbox.java @@ -0,0 +1,621 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.actions.SegmentAllocateAction; +import org.apache.druid.indexing.common.actions.SegmentAllocateRequest; +import org.apache.druid.indexing.common.actions.SegmentAllocateResult; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.LockFilterPolicy; +import org.apache.druid.metadata.ReplaceTaskLock; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; + +/** + * Maintains a {@link TaskLockbox} for each datasource. + * A {@link TaskLockbox} is used to maintain locks over intervals of a datasource + * to ensure data consistency while various types of ingestion tasks append, + * overwrite or delete data. + */ +public class GlobalTaskLockbox +{ + private static final Logger log = new Logger(GlobalTaskLockbox.class); + + private final TaskStorage taskStorage; + private final IndexerMetadataStorageCoordinator metadataStorageCoordinator; + private final ConcurrentHashMap datasourceToLockbox = new ConcurrentHashMap<>(); + + private final AtomicBoolean syncComplete = new AtomicBoolean(false); + + @Inject + public GlobalTaskLockbox( + TaskStorage taskStorage, + IndexerMetadataStorageCoordinator metadataStorageCoordinator + ) + { + this.taskStorage = taskStorage; + this.metadataStorageCoordinator = metadataStorageCoordinator; + } + + /** + * Syncs the current in-memory state with the {@link TaskStorage}. + * This method should be called only from {@link TaskQueue#start()}. + * If the sync fails, no other operation can be performed on this lockbox. + * + * @return SyncResult which needs to be processed by the caller + */ + public TaskLockboxSyncResult syncFromStorage() + { + // Shutdown to reset the state and ensure that no other operation is + // in progress during the sync + shutdown(); + + // Retrieve all active tasks and locks associated with them + final Map datasourceToSyncResult = new HashMap<>(); + int activeTaskCount = 0; + int totalLockCount = 0; + for (Task task : taskStorage.getActiveTasks()) { + ++activeTaskCount; + final DatasourceSyncResult result = datasourceToSyncResult.computeIfAbsent( + task.getDataSource(), + ds -> new DatasourceSyncResult() + ); + result.storedActiveTasks.add(task); + for (TaskLock taskLock : taskStorage.getLocks(task.getId())) { + ++totalLockCount; + result.storedLocks.add(Pair.of(task, taskLock)); + } + } + + // Identify task groups in which at least one task failed to re-acquire a lock + final Set tasksToFail = new HashSet<>(); + final AtomicInteger taskLockCount = new AtomicInteger(0); + + datasourceToSyncResult.forEach((dataSource, syncResult) -> { + try (final DatasourceLockboxResource lockboxResource = getLockboxResource(dataSource, false)) { + final TaskLockboxSyncResult lockboxSyncResult = lockboxResource.delegate.resetState( + syncResult.storedActiveTasks, + syncResult.storedLocks + ); + tasksToFail.addAll(lockboxSyncResult.getTasksToFail()); + taskLockCount.addAndGet(lockboxSyncResult.getTaskLockCount()); + } + }); + + log.info( + "Synced [%,d] locks for [%,d] active tasks from storage ([%,d] locks ignored).", + taskLockCount.get(), activeTaskCount, totalLockCount - taskLockCount.get() + ); + + syncComplete.set(true); + return new TaskLockboxSyncResult(tasksToFail, taskLockCount.get()); + } + + /** + * Clears up the state of the lockbox. Should be called when leadership is lost. + */ + public void shutdown() + { + // Mark sync as incomplete so that no more lockboxes are created + syncComplete.set(false); + + // Clean up all existing lockboxes + final Set datasourceNames = Set.copyOf(datasourceToLockbox.keySet()); + for (String dataSource : datasourceNames) { + // This can block if a bad runaway operation is still working on the underlying TaskLockbox. + // There is currently no clear way to interrupt ongoing operations. + cleanupLockboxResourceIf(dataSource, resource -> true); + } + + log.info( + "Removed lockboxes for [%d] datasources, [%d] remaining.", + datasourceNames.size(), datasourceToLockbox.size() + ); + } + + /** + * Acquires a lock on behalf of a task. Blocks until the lock is acquired. + * + * @return {@link LockResult} containing a new or an existing lock if succeeded. + * Otherwise, {@link LockResult} with {@link LockResult#isRevoked()} true. + * @throws InterruptedException if the current thread is interrupted + */ + public LockResult lock(final Task task, final LockRequest request) throws InterruptedException + { + return computeForTask( + task, + lockbox -> lockbox.lock(task, request) + ); + } + + /** + * Acquires a lock on behalf of a task, waiting up to the specified wait time + * if necessary. + * + * @return {@link LockResult} containing a new or an existing lock if succeeded. + * Otherwise, a {@link LockResult} with {@link LockResult#isRevoked()} true. + * @throws InterruptedException if the current thread is interrupted + */ + public LockResult lock(final Task task, final LockRequest request, long timeoutMs) throws InterruptedException + { + return computeForTask( + task, + lockbox -> lockbox.lock(task, request, timeoutMs) + ); + } + + /** + * Attempt to acquire a lock for a task, without removing it from the queue. Can safely be called multiple times on + * the same task until the lock is preempted. + * + * @return {@link LockResult} containing a new or an existing lock if succeeded. + * Otherwise, {@link LockResult} with {@link LockResult#isRevoked()} true. + * @throws IllegalStateException if the task is not a valid active task + */ + public LockResult tryLock(final Task task, final LockRequest request) + { + return computeForTask( + task, + lockbox -> lockbox.tryLock(task, request) + ); + } + + /** + * Attempts to allocate segments for the given requests. Each request contains + * a {@link Task} and a {@link SegmentAllocateAction}. This method tries to + * acquire the task locks on the required intervals/segments and then performs + * a batch allocation of segments. It is possible that some requests succeed + * and others fail. In that case, only the failed ones should be retried. + * + * @param requests List of allocation requests + * @param dataSource Datasource for which segment is to be allocated. + * @param interval Interval for which segment is to be allocated. + * @param skipSegmentLineageCheck Whether lineage check is to be skipped + * (this is true for streaming ingestion) + * @param lockGranularity Granularity of task lock + * @return List of allocation results in the same order as the requests. + */ + public List allocateSegments( + List requests, + String dataSource, + Interval interval, + boolean skipSegmentLineageCheck, + LockGranularity lockGranularity, + boolean reduceMetadataIO + ) + { + return computeForDatasource( + dataSource, + lockbox -> lockbox.allocateSegments( + requests, + dataSource, + interval, + skipSegmentLineageCheck, + lockGranularity, + reduceMetadataIO + ) + ); + } + + /** + * Perform the given action with a guarantee that the locks of the task are not revoked in the middle of action. This + * method first checks that all locks for the given task and intervals are valid and perform the right action. + *

    + * The given action should be finished as soon as possible because all other methods in this class are blocked until + * this method is finished. + * + * @param task task performing a critical action + * @param intervals intervals + * @param action action to be performed inside the critical section + */ + public T doInCriticalSection(Task task, Set intervals, CriticalAction action) throws Exception + { + return computeForTask( + task, + lockbox -> lockbox.doInCriticalSection(task, intervals, action) + ); + } + + /** + * Mark the lock as revoked. Note that revoked locks are NOT removed. Instead, they are kept in memory + * and {@link #taskStorage} as the normal locks do. This is to check locks are revoked when they are requested to be + * acquired and notify to the callers if revoked. Revoked locks are removed by calling + * {@link #unlock(Task, Interval)}. + * + * @param taskId an id of the task holding the lock + * @param lock lock to be revoked + */ + @VisibleForTesting + public void revokeLock(String taskId, TaskLock lock) + { + computeForDatasource( + lock.getDataSource(), + lockbox -> { + lockbox.revokeLock(taskId, lock); + return 0; + } + ); + } + + /** + * Cleans up pending segments associated with the given task, if any. + */ + protected void cleanupPendingSegments(Task task) + { + executeForTask( + task, + lockbox -> lockbox.cleanupPendingSegments(task) + ); + } + + /** + * Returns all the active locks currently held by the given task. + */ + public List findLocksForTask(final Task task) + { + return computeForTask( + task, + lockbox -> lockbox.findLocksForTask(task) + ); + } + + /** + * Returns all the active non-revoked REPLACE locks held by the given task. + */ + public Set findReplaceLocksForTask(Task task) + { + return computeForTask( + task, + lockbox -> lockbox.findReplaceLocksForTask(task) + ); + } + + /** + * Returns all the active non-revoked REPLACE locks for the given datasource. + */ + public Set getAllReplaceLocksForDatasource(String datasource) + { + return computeForDatasource( + datasource, + lockbox -> lockbox.getAllReplaceLocksForDatasource(datasource) + ); + } + + /** + * @param lockFilterPolicies Lock filters for the given datasources + * @return Map from datasource to intervals locked by tasks satisfying the lock filter condititions + */ + public Map> getLockedIntervals(List lockFilterPolicies) + { + final Map> datasourceToFilterPolicies = new HashMap<>(); + for (LockFilterPolicy policy : lockFilterPolicies) { + datasourceToFilterPolicies.computeIfAbsent(policy.getDatasource(), ds -> new ArrayList<>()) + .add(policy); + } + + final Map> datasourceToLockedIntervals = new HashMap<>(); + datasourceToFilterPolicies.forEach((datasource, policies) -> { + final List lockedIntervals = computeForDatasource( + datasource, + lockbox -> lockbox.getLockedIntervals(policies) + ); + if (!lockedIntervals.isEmpty()) { + datasourceToLockedIntervals.put(datasource, lockedIntervals); + } + }); + + return datasourceToLockedIntervals; + } + + /** + * @param lockFilterPolicies Lock filters for the given datasources + * @return Map from datasource to list of non-revoked locks with at least as much priority and an overlapping interval + */ + public Map> getActiveLocks(List lockFilterPolicies) + { + final Map> datasourceToFilterPolicies = new HashMap<>(); + for (LockFilterPolicy policy : lockFilterPolicies) { + datasourceToFilterPolicies.computeIfAbsent(policy.getDatasource(), ds -> new ArrayList<>()) + .add(policy); + } + + final Map> datasourceToActiveLocks = new HashMap<>(); + datasourceToFilterPolicies.forEach((datasource, policies) -> { + final List datasourceLocks = computeForDatasource( + datasource, + lockbox -> lockbox.getActiveLocks(policies) + ); + if (!datasourceLocks.isEmpty()) { + datasourceToActiveLocks.put(datasource, datasourceLocks); + } + }); + + return datasourceToActiveLocks; + } + + /** + * Releases the lock held by a task over the given interval. + */ + public void unlock(final Task task, final Interval interval) + { + executeForTask( + task, + lockbox -> lockbox.unlock(task, interval) + ); + } + + /** + * Releases all locks currently held by the given task. + */ + public void unlockAll(Task task) + { + executeForTask( + task, + lockbox -> lockbox.unlockAll(task) + ); + } + + /** + * Adds the given task to the set of active tasks. + */ + public void add(Task task) + { + executeForTask( + task, + lockbox -> lockbox.add(task) + ); + } + + /** + * Removes a task from the set of active tasks and releases all locks held by it. + * Does nothing if the task is not active or doesn't hold any locks. + */ + public void remove(final Task task) + { + final boolean isEmpty = computeForTask( + task, + lockbox -> { + lockbox.remove(task); + return lockbox.isEmpty(); + } + ); + + if (isEmpty) { + cleanupLockboxResourceIf(task.getDataSource(), resource -> resource.references.get() <= 0); + } + } + + @VisibleForTesting + Optional getOnlyTaskLockPosseContainingInterval(Task task, Interval interval) + { + return computeForTask( + task, + lockbox -> lockbox.getOnlyTaskLockPosseContainingInterval(task, interval) + ); + } + + @VisibleForTesting + Set getActiveTasks() + { + final Set allActiveTasks = new HashSet<>(); + + final Set datasourceNames = Set.copyOf(datasourceToLockbox.keySet()); + for (String datasource : datasourceNames) { + allActiveTasks.addAll( + computeForDatasource(datasource, TaskLockbox::getActiveTasks) + ); + } + + return allActiveTasks; + } + + @VisibleForTesting + Map>>> getAllLocks() + { + final Map>>> + allLocks = new HashMap<>(); + + final Set datasourceNames = Set.copyOf(datasourceToLockbox.keySet()); + for (String datasource : datasourceNames) { + final NavigableMap>> + datasourceLocks = computeForDatasource(datasource, TaskLockbox::getAllLocks); + if (!datasourceLocks.isEmpty()) { + allLocks.put(datasource, datasourceLocks); + } + } + + return allLocks; + } + + /** + * Gets the {@link DatasourceLockboxResource} for the given datasource and + * increments the number of references currently in use. + * This resource must be closed once it is not needed anymore. + * + * @throws ISE if {@code verifySyncComplete} is true and sync is not complete + * yet. + */ + private DatasourceLockboxResource getLockboxResource( + String datasource, + boolean verifySyncComplete + ) + { + return datasourceToLockbox.compute( + datasource, + (ds, existingResource) -> { + final DatasourceLockboxResource resource = Objects.requireNonNullElseGet( + existingResource, + () -> new DatasourceLockboxResource( + new TaskLockbox(ds, taskStorage, metadataStorageCoordinator) + ) + ); + + // Verify sync is complete before acquiring the resource + if (verifySyncComplete && !syncComplete.get()) { + throw new ISE( + "Cannot get TaskLockbox for datasource[%s] as sync with storage has not happened yet.", + datasource + ); + } + + resource.acquireReference(); + return resource; + } + ); + } + + /** + * Cleans up the lockbox for the given datasource if the {@link DatasourceLockboxResource} + * meets the given criteria. + */ + private void cleanupLockboxResourceIf( + String dataSource, + Predicate resourcePredicate + ) + { + datasourceToLockbox.compute( + dataSource, + (ds, resource) -> { + if (resource != null && resourcePredicate.test(resource)) { + resource.delegate.clear(); + return null; + } else { + return resource; + } + } + ); + } + + /** + * Performs a computation using the {@link TaskLockbox} corresponding to the + * given datasource and returns the result. + */ + private R computeForDatasource( + String datasource, + LockComputation computation + ) throws T + { + try (final DatasourceLockboxResource lockbox = getLockboxResource(datasource, true)) { + return computation.perform(lockbox.delegate); + } + } + + /** + * Performs a computation using the {@link TaskLockbox} corresponding to the + * given task and returns the result. + */ + private R computeForTask( + Task task, + LockComputation computation + ) throws T + { + return computeForDatasource(task.getDataSource(), computation); + } + + /** + * Executes an operation on the {@link TaskLockbox} corresponding to the given + * task. + */ + private void executeForTask( + Task task, + LockOperation operation + ) throws T + { + computeForDatasource( + task.getDataSource(), + lockbox -> { + operation.perform(lockbox); + return 0; + } + ); + } + + @FunctionalInterface + private interface LockComputation + { + R perform(TaskLockbox lockbox) throws T; + } + + @FunctionalInterface + private interface LockOperation + { + void perform(TaskLockbox lockbox) throws T; + } + + /** + * Result of metadata store sync for a single datasource. + */ + private static class DatasourceSyncResult + { + final Set storedActiveTasks = new HashSet<>(); + final List> storedLocks = new ArrayList<>(); + } + + /** + * Wrapper around a {@link TaskLockbox} for a specific datasource which keeps + * track of the number of active references of this resource that are currently + * in use. + */ + private static class DatasourceLockboxResource implements AutoCloseable + { + final AtomicInteger references; + final TaskLockbox delegate; + + DatasourceLockboxResource(TaskLockbox delegate) + { + this.delegate = delegate; + this.references = new AtomicInteger(0); + } + + void acquireReference() + { + references.incrementAndGet(); + } + + @Override + public void close() + { + references.decrementAndGet(); + } + } + +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/LockResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/LockResult.java index fa18cbb87f24..400d66d39a2c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/LockResult.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/LockResult.java @@ -27,7 +27,7 @@ import javax.annotation.Nullable; /** - * This class represents the result of {@link TaskLockbox#tryLock}. If the lock + * This class represents the result of {@link GlobalTaskLockbox#tryLock}. If the lock * acquisition fails, the callers can tell that it was failed because it was preempted by other locks of higher * priorities or not by checking the {@link #revoked} flag. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 99168143f84d..c7161b75463b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -24,11 +24,9 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.errorprone.annotations.concurrent.GuardedBy; -import com.google.inject.Inject; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentLock; @@ -74,30 +72,33 @@ import java.util.stream.Collectors; /** + * Maintains the state of task locks for a single datasource. + *

    * Remembers which activeTasks have locked which intervals or which segments. Tasks are permitted to lock an interval * or a segment if no other task outside their group has locked an overlapping interval for the same datasource or * the same segments. Note that TaskLockbox is also responsible for allocating segmentIds when a task requests to lock * a new segment. Task lock might involve version assignment. - * + *

    * - When a task locks an interval or a new segment, it is assigned a new version string that it can use to publish * segments. * - When a task locks a existing segment, it doesn't need to be assigned a new version. - * + *

    * Note that tasks of higher priorities can revoke locks of tasks of lower priorities. */ public class TaskLockbox { - // Datasource -> startTime -> Interval -> list of (Tasks + TaskLock) + // startTime -> Interval -> list of (Tasks + TaskLock) // Multiple shared locks can be acquired for the same dataSource and interval. // Note that revoked locks are also maintained in this map to notify that those locks are revoked to the callers when // they acquire the same locks again. // Also, the key of the second inner map is the start time to find all intervals properly starting with the same // startTime. - private final Map>>> running = new HashMap<>(); + private final NavigableMap>> running = new TreeMap<>(); + private final String dataSource; private final TaskStorage taskStorage; private final IndexerMetadataStorageCoordinator metadataStorageCoordinator; - private final ReentrantLock giant = new ReentrantLock(true); + private final ReentrantLock giant = new ReentrantLock(); private final Condition lockReleaseCondition = giant.newCondition(); private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); @@ -116,35 +117,55 @@ public class TaskLockbox @GuardedBy("giant") private final Map> activeAllocatorIdToTaskIds = new HashMap<>(); - @Inject public TaskLockbox( + String dataSource, TaskStorage taskStorage, IndexerMetadataStorageCoordinator metadataStorageCoordinator ) { + this.dataSource = dataSource; this.taskStorage = taskStorage; this.metadataStorageCoordinator = metadataStorageCoordinator; } + void clear() + { + giant.lock(); + try { + running.clear(); + activeTasks.clear(); + activeAllocatorIdToTaskIds.clear(); + } + finally { + giant.unlock(); + } + } + + boolean isEmpty() + { + giant.lock(); + try { + return activeTasks.isEmpty() && running.isEmpty() && activeAllocatorIdToTaskIds.isEmpty(); + } + finally { + giant.unlock(); + } + } + /** * Wipe out our current in-memory state and resync it from our bundled {@link TaskStorage}. + * This method must be called only from {@link GlobalTaskLockbox#syncFromStorage()}. * * @return SyncResult which needs to be processed by the caller */ - public TaskLockboxSyncResult syncFromStorage() + TaskLockboxSyncResult resetState( + final Set storedActiveTasks, + final List> storedLocks + ) { giant.lock(); try { - // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. - final Set storedActiveTasks = new HashSet<>(); - final List> storedLocks = new ArrayList<>(); - for (final Task task : taskStorage.getActiveTasks()) { - storedActiveTasks.add(task); - for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { - storedLocks.add(Pair.of(task, taskLock)); - } - } // Sort locks by version, so we add them back in the order they were acquired. final Ordering> byVersionOrdering = new Ordering<>() { @@ -229,19 +250,16 @@ public int compare(Pair left, Pair right) } } - log.info( - "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).", - taskLockCount, - activeTasks.size(), - storedLocks.size() - taskLockCount - ); - if (!failedToReacquireLockTaskGroups.isEmpty()) { log.warn("Marking all tasks from task groups[%s] to be failed " + "as they failed to reacquire at least one lock.", failedToReacquireLockTaskGroups); } - return new TaskLockboxSyncResult(tasksToFail); + return new TaskLockboxSyncResult(tasksToFail, taskLockCount); + } + catch (Throwable t) { + log.noStackTrace().error(t, "Error while resetting state of datasource[%s]", dataSource); + throw t; } finally { giant.unlock(); @@ -249,13 +267,12 @@ public int compare(Pair left, Pair right) } /** - * Reacquire lock during {@link #syncFromStorage()}. + * Reacquires a lock during {@link #resetState}. * * @return null if the lock could not be reacquired. */ - @VisibleForTesting @Nullable - protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock) + private TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock) { if (!taskMatchesLock(task, taskLock)) { log.warn( @@ -314,10 +331,10 @@ protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock) } /** - * Returns true if the datasource, groupId and priority of the given Task + * @return true if the datasource, groupId and priority of the given Task * match that of the TaskLock. */ - private boolean taskMatchesLock(Task task, TaskLock taskLock) + private static boolean taskMatchesLock(Task task, TaskLock taskLock) { return task.getGroupId().equals(taskLock.getGroupId()) && task.getDataSource().equals(taskLock.getDataSource()) @@ -552,7 +569,6 @@ private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, bool try { final TaskLockPosse posseToUse; final List foundPosses = findLockPossesOverlapsInterval( - request.getDataSource(), request.getInterval() ); @@ -686,11 +702,10 @@ private TaskLockPosse createNewTaskLockPosse(LockRequest request) giant.lock(); try { final TaskLockPosse posseToUse = new TaskLockPosse(request.toLock()); - running.computeIfAbsent(request.getDataSource(), k -> new TreeMap<>()) - .computeIfAbsent( - request.getInterval().getStart(), - k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()) - ) + running.computeIfAbsent( + request.getInterval().getStart(), + k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()) + ) .computeIfAbsent(request.getInterval(), k -> new ArrayList<>()) .add(posseToUse); @@ -807,7 +822,6 @@ private void revokeLock(TaskLockPosse lockPosse) * @param taskId an id of the task holding the lock * @param lock lock to be revoked */ - @VisibleForTesting public void revokeLock(String taskId, TaskLock lock) { giant.lock(); @@ -830,8 +844,7 @@ public void revokeLock(String taskId, TaskLock lock) final TaskLock revokedLock = lock.revokedCopy(); taskStorage.replaceLock(taskId, lock, revokedLock); - final List possesHolder = running.get(task.getDataSource()) - .get(lock.getInterval().getStart()) + final List possesHolder = running.get(lock.getInterval().getStart()) .get(lock.getInterval()); final TaskLockPosse foundPosse = possesHolder.stream() .filter(posse -> posse.getTaskLock().equals(lock)) @@ -888,13 +901,8 @@ public Set getAllReplaceLocksForDatasource(String datasource) { giant.lock(); try { - final NavigableMap>> activeLocks = running.get(datasource); - if (activeLocks == null) { - return ImmutableSet.of(); - } - List lockPosses - = activeLocks.values() + = running.values() .stream() .flatMap(map -> map.values().stream()) .flatMap(Collection::stream) @@ -933,12 +941,12 @@ private Set getNonRevokedReplaceLocks(List posse } /** - * @param lockFilterPolicies Lock filters for the given datasources - * @return Map from datasource to intervals locked by tasks satisfying the lock filter condititions + * @param lockFilterPolicies Lock filters for this datasource + * @return List of intervals locked by tasks satisfying the lock filter condititions. */ - public Map> getLockedIntervals(List lockFilterPolicies) + public List getLockedIntervals(List lockFilterPolicies) { - final Map> datasourceToIntervals = new HashMap<>(); + final Set lockedIntervals = new HashSet<>(); // Take a lock and populate the maps giant.lock(); @@ -946,8 +954,7 @@ public Map> getLockedIntervals(List loc try { lockFilterPolicies.forEach( lockFilter -> { - final String datasource = lockFilter.getDatasource(); - if (!running.containsKey(datasource)) { + if (!dataSource.equals(lockFilter.getDatasource())) { return; } @@ -966,7 +973,7 @@ public Map> getLockedIntervals(List loc ); final boolean ignoreAppendLocks = isUsingConcurrentLocks || isReplaceLock; - running.get(datasource).forEach( + running.forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach( (interval, taskLockPosses) -> taskLockPosses.forEach( taskLockPosse -> { @@ -979,8 +986,7 @@ public Map> getLockedIntervals(List loc || taskLockPosse.getTaskLock().getPriority() < priority) { // do nothing } else { - datasourceToIntervals.computeIfAbsent(datasource, k -> new HashSet<>()) - .add(interval); + lockedIntervals.add(interval); } } ) @@ -993,20 +999,16 @@ public Map> getLockedIntervals(List loc giant.unlock(); } - return datasourceToIntervals.entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> new ArrayList<>(entry.getValue()) - )); + return new ArrayList<>(lockedIntervals); } /** * @param lockFilterPolicies Lock filters for the given datasources - * @return Map from datasource to list of non-revoked locks with at least as much priority and an overlapping interval + * @return List of non-revoked locks with at least as much priority and an overlapping interval. */ - public Map> getActiveLocks(List lockFilterPolicies) + public List getActiveLocks(List lockFilterPolicies) { - final Map> datasourceToLocks = new HashMap<>(); + final List activeLocks = new ArrayList<>(); // Take a lock and populate the maps giant.lock(); @@ -1014,8 +1016,7 @@ public Map> getActiveLocks(List lockFil try { lockFilterPolicies.forEach( lockFilter -> { - final String datasource = lockFilter.getDatasource(); - if (!running.containsKey(datasource)) { + if (!dataSource.equals(lockFilter.getDatasource())) { return; } @@ -1048,7 +1049,7 @@ public Map> getActiveLocks(List lockFil ignoreAppendLocks = useConcurrentLocks; } - running.get(datasource).forEach( + running.forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach( (interval, taskLockPosses) -> taskLockPosses.forEach( taskLockPosse -> { @@ -1063,8 +1064,7 @@ public Map> getActiveLocks(List lockFil } else { for (Interval filterInterval : intervals) { if (interval.overlaps(filterInterval)) { - datasourceToLocks.computeIfAbsent(datasource, ds -> new ArrayList<>()) - .add(taskLockPosse.getTaskLock()); + activeLocks.add(taskLockPosse.getTaskLock()); break; } } @@ -1080,7 +1080,7 @@ public Map> getActiveLocks(List lockFil giant.unlock(); } - return datasourceToLocks; + return activeLocks; } public void unlock(final Task task, final Interval interval) @@ -1100,10 +1100,9 @@ private void unlock(final Task task, final Interval interval, @Nullable Integer giant.lock(); try { - final String dataSource = task.getDataSource(); final NavigableMap>> locksForDatasource - = running.get(task.getDataSource()); - if (locksForDatasource == null || locksForDatasource.isEmpty()) { + = running; + if (locksForDatasource.isEmpty()) { return; } @@ -1145,9 +1144,6 @@ private void unlock(final Task task, final Interval interval, @Nullable Integer if (intervalToPosses.isEmpty()) { locksForDatasource.remove(interval.getStart()); } - if (running.get(dataSource).isEmpty()) { - running.remove(dataSource); - } // Wake up blocking-lock waiters lockReleaseCondition.signalAll(); @@ -1306,25 +1302,19 @@ protected void cleanupPendingSegments(Task task) private List findLockPossesForTask(final Task task) { // Scan through all locks for this datasource - final NavigableMap>> locksForDatasource - = running.get(task.getDataSource()); - if (locksForDatasource == null) { - return Collections.emptyList(); - } - - return locksForDatasource.values().stream() + return running.values().stream() .flatMap(map -> map.values().stream()) .flatMap(Collection::stream) .filter(taskLockPosse -> taskLockPosse.containsTask(task)) .collect(Collectors.toList()); } - private List findLockPossesContainingInterval(final String dataSource, final Interval interval) + private List findLockPossesContainingInterval(final Interval interval) { giant.lock(); try { - final List intervalOverlapsPosses = findLockPossesOverlapsInterval(dataSource, interval); + final List intervalOverlapsPosses = findLockPossesOverlapsInterval(interval); return intervalOverlapsPosses.stream() .filter(taskLockPosse -> taskLockPosse.taskLock.getInterval().contains(interval)) .collect(Collectors.toList()); @@ -1337,19 +1327,18 @@ private List findLockPossesContainingInterval(final String dataSo /** * Return all locks that overlap some search interval. */ - private List findLockPossesOverlapsInterval(final String dataSource, final Interval interval) + private List findLockPossesOverlapsInterval(final Interval interval) { giant.lock(); try { - final NavigableMap>> dsRunning = running.get(dataSource); - if (dsRunning == null) { + if (running.isEmpty()) { // No locks at all return Collections.emptyList(); } else { - return dsRunning.navigableKeySet().stream() + return running.navigableKeySet().stream() .filter(java.util.Objects::nonNull) - .map(dsRunning::get) + .map(running::get) .filter(java.util.Objects::nonNull) .flatMap(sortedMap -> sortedMap.entrySet().stream()) .filter(entry -> entry.getKey().overlaps(interval)) @@ -1367,7 +1356,7 @@ Optional getOnlyTaskLockPosseContainingInterval(Task task, Interv { giant.lock(); try { - final List filteredPosses = findLockPossesContainingInterval(task.getDataSource(), interval) + final List filteredPosses = findLockPossesContainingInterval(interval) .stream() .filter(lockPosse -> lockPosse.containsTask(task)) .collect(Collectors.toList()); @@ -1401,7 +1390,7 @@ Set getActiveTasks() } @VisibleForTesting - Map>>> getAllLocks() + NavigableMap>> getAllLocks() { return running; } @@ -1461,8 +1450,8 @@ private boolean canAppendLockCoexist(List conflictPosses, LockReq } /** - * Check if a REPLACE lock can coexist with a given set of conflicting posses. - * A REPLACE lock can coexist with any number of other APPEND locks and revoked locks + * Check if a REPLACE-lock can coexist with a given set of conflicting posses. + * A REPLACE-lock can coexist with any number of other APPEND locks and revoked locks * @param conflictPosses conflicting lock posses * @param replaceLock replace lock request * @return true iff replace lock can coexist with all its conflicting locks @@ -1596,7 +1585,7 @@ private boolean revokeAllIncompatibleActiveLocksIfPossible( /** * Task locks for tasks of the same groupId */ - static class TaskLockPosse + protected static class TaskLockPosse { private final TaskLock taskLock; private final Set taskIds; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java index b7273b6bdef1..20b78e44650d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockboxSyncResult.java @@ -27,12 +27,14 @@ * Result of TaskLockbox#syncFromStorage() * Contains tasks which need to be forcefully failed to let the overlord become the leader */ -class TaskLockboxSyncResult +public class TaskLockboxSyncResult { + private final int taskLockCount; private final Set tasksToFail; - TaskLockboxSyncResult(Set tasksToFail) + TaskLockboxSyncResult(Set tasksToFail, int taskLockCount) { + this.taskLockCount = taskLockCount; this.tasksToFail = tasksToFail; } @@ -43,4 +45,9 @@ Set getTasksToFail() { return tasksToFail; } + + int getTaskLockCount() + { + return taskLockCount; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 3613b6fa08d2..8e4830611bee 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -57,7 +57,7 @@ /** * Provides read-only methods to fetch information related to tasks. * This class may serve information that is cached in memory in {@link TaskQueue} - * or {@link TaskLockbox}. If not present in memory, then the underlying + * or {@link GlobalTaskLockbox}. If not present in memory, then the underlying * {@link TaskStorage} is queried. */ public class TaskQueryTool @@ -65,7 +65,7 @@ public class TaskQueryTool private static final Logger log = new Logger(TaskQueryTool.class); private final TaskStorage storage; - private final TaskLockbox taskLockbox; + private final GlobalTaskLockbox taskLockbox; private final TaskMaster taskMaster; private final Supplier workerBehaviorConfigSupplier; private final ProvisioningStrategy provisioningStrategy; @@ -73,7 +73,7 @@ public class TaskQueryTool @Inject public TaskQueryTool( TaskStorage storage, - TaskLockbox taskLockbox, + GlobalTaskLockbox taskLockbox, TaskMaster taskMaster, ProvisioningStrategy provisioningStrategy, Supplier workerBehaviorConfigSupplier diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 051817416ac3..c3e8e56bcd6c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -120,7 +120,7 @@ public class TaskQueue private final TaskStorage taskStorage; private final TaskRunner taskRunner; private final TaskActionClientFactory taskActionClientFactory; - private final TaskLockbox taskLockbox; + private final GlobalTaskLockbox taskLockbox; private final ServiceEmitter emitter; private final ObjectMapper passwordRedactingMapper; private final TaskContextEnricher taskContextEnricher; @@ -172,7 +172,7 @@ public TaskQueue( TaskStorage taskStorage, TaskRunner taskRunner, TaskActionClientFactory taskActionClientFactory, - TaskLockbox taskLockbox, + GlobalTaskLockbox taskLockbox, ServiceEmitter emitter, ObjectMapper mapper, TaskContextEnricher taskContextEnricher @@ -218,10 +218,10 @@ public void start() try { Preconditions.checkState(!active, "queue must be stopped"); setActive(true); - syncFromStorage(); // Mark these tasks as failed as they could not reacuire the lock // Clean up needs to happen after tasks have been synced from storage Set tasksToFail = taskLockbox.syncFromStorage().getTasksToFail(); + syncFromStorage(); for (Task task : tasksToFail) { shutdown(task.getId(), "Shutting down forcefully as task failed to reacquire lock while becoming leader"); @@ -295,6 +295,7 @@ public void stop() try { setActive(false); activeTasks.clear(); + taskLockbox.shutdown(); managerExec.shutdownNow(); storageSyncExec.shutdownNow(); requestManagement(); @@ -565,7 +566,7 @@ private void addTaskInternal(final Task task, final DateTime updateTime) if (added.get()) { taskLockbox.add(task); } else if (!entry.task.equals(task)) { - throw new ISE("Cannot add task ID [%s] with same ID as task that has already been added", task.getId()); + throw new ISE("Cannot add task[%s] as a different task for the same ID has already been added.", task.getId()); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java index bce95ffa2582..c2b57e19097c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKiller.java @@ -32,8 +32,8 @@ import org.apache.druid.indexing.common.task.KillUnusedSegmentsTask; import org.apache.druid.indexing.common.task.TaskMetrics; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Stopwatch; import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory; @@ -90,7 +90,7 @@ public class UnusedSegmentsKiller implements OverlordDuty private static final Duration MAX_TASK_DURATION = Duration.standardMinutes(10); private final ServiceEmitter emitter; - private final TaskLockbox taskLockbox; + private final GlobalTaskLockbox taskLockbox; private final DruidLeaderSelector leaderSelector; private final DataSegmentKiller dataSegmentKiller; @@ -121,7 +121,7 @@ public UnusedSegmentsKiller( @IndexingService DruidLeaderSelector leaderSelector, ScheduledExecutorFactory executorFactory, DataSegmentKiller dataSegmentKiller, - TaskLockbox taskLockbox, + GlobalTaskLockbox taskLockbox, ServiceEmitter emitter ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index 66fb89456e99..e51ff0560fea 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -30,8 +30,8 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; @@ -1143,7 +1143,7 @@ public void testAllocateDayWhenMonthNotPossible() public void testSegmentIdMustNotBeReused() { final IndexerMetadataStorageCoordinator coordinator = taskActionTestKit.getMetadataStorageCoordinator(); - final TaskLockbox lockbox = taskActionTestKit.getTaskLockbox(); + final GlobalTaskLockbox lockbox = taskActionTestKit.getTaskLockbox(); final Task task0 = NoopTask.ofPriority(25); lockbox.add(task0); final NoopTask task1 = NoopTask.ofPriority(50); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 2ddd0632291a..76e207521a72 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -23,9 +23,9 @@ import com.google.common.base.Suppliers; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.config.TaskStorageConfig; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.config.TaskLockConfig; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; @@ -55,7 +55,7 @@ public class TaskActionTestKit extends ExternalResource private final MetadataStorageTablesConfig metadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase("druid"); private TaskStorage taskStorage; - private TaskLockbox taskLockbox; + private GlobalTaskLockbox taskLockbox; private StubServiceEmitter emitter; private TestDerbyConnector testDerbyConnector; private IndexerMetadataStorageCoordinator metadataStorageCoordinator; @@ -81,7 +81,7 @@ public MetadataStorageTablesConfig getMetadataStorageTablesConfig() return metadataStorageTablesConfig; } - public TaskLockbox getTaskLockbox() + public GlobalTaskLockbox getTaskLockbox() { return taskLockbox; } @@ -136,7 +136,8 @@ public void before() segmentSchemaManager, CentralizedDatasourceSchemaConfig.create() ); - taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + taskLockbox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); + taskLockbox.syncFromStorage(); final TaskLockConfig taskLockConfig = new TaskLockConfig() { @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java index f2749d15cdb1..3fd3cd9cbd9e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java @@ -31,10 +31,10 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.LockResult; import org.apache.druid.indexing.overlord.SpecificSegmentLockRequest; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator; @@ -58,17 +58,18 @@ public class TaskLocksTest { - private TaskLockbox lockbox; + private GlobalTaskLockbox lockbox; private Task task; @Before public void setup() { final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); - lockbox = new TaskLockbox( + lockbox = new GlobalTaskLockbox( taskStorage, new TestIndexerMetadataStorageCoordinator() ); + lockbox.syncFromStorage(); task = NoopTask.create(); taskStorage.insert(task, TaskStatus.running(task.getId())); lockbox.add(task); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 9972340ae0cb..e51a7017a67b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -50,9 +50,9 @@ import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; import org.apache.druid.indexing.common.config.TaskStorageConfig; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; @@ -130,7 +130,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest private TaskStorage taskStorage; private IndexerSQLMetadataStorageCoordinator storageCoordinator; private SegmentsMetadataManager segmentsMetadataManager; - private TaskLockbox lockbox; + private GlobalTaskLockbox lockbox; private File baseDir; private SupervisorManager supervisorManager; private TestDataSegmentKiller dataSegmentKiller; @@ -187,7 +187,8 @@ public void setUpIngestionTestBase() throws IOException NoopServiceEmitter.instance(), objectMapper ); - lockbox = new TaskLockbox(taskStorage, storageCoordinator); + lockbox = new GlobalTaskLockbox(taskStorage, storageCoordinator); + lockbox.syncFromStorage(); segmentCacheManagerFactory = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, getObjectMapper()); reportsFile = temporaryFolder.newFile(); dataSegmentKiller = new TestDataSegmentKiller(); @@ -255,7 +256,7 @@ public SegmentsMetadataManager getSegmentsMetadataManager() return segmentsMetadataManager; } - public TaskLockbox getLockbox() + public GlobalTaskLockbox getLockbox() { return lockbox; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java index b204a69ed384..9353953fc2af 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java @@ -28,8 +28,8 @@ import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.task.CompactionTask; import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskQueue; @@ -136,7 +136,8 @@ public void setUp() private void initScheduler() { - TaskLockbox taskLockbox = new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()); + GlobalTaskLockbox taskLockbox = new GlobalTaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()); + taskLockbox.syncFromStorage(); WorkerBehaviorConfig defaultWorkerConfig = new DefaultWorkerBehaviorConfig(WorkerBehaviorConfig.DEFAULT_STRATEGY, null); scheduler = new OverlordCompactionScheduler( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/GlobalTaskLockboxTest.java similarity index 95% rename from indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java rename to indexing-service/src/test/java/org/apache/druid/indexing/overlord/GlobalTaskLockboxTest.java index 48c5b5c4e94a..2c5390e568b9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/GlobalTaskLockboxTest.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.apache.druid.error.ExceptionMatcher; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.SegmentLock; @@ -43,7 +44,6 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; -import org.apache.druid.indexing.overlord.TaskLockbox.TaskLockPosse; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; @@ -72,6 +72,7 @@ import org.apache.druid.timeline.partition.PartialShardSpec; import org.apache.druid.timeline.partition.PartitionIds; import org.easymock.EasyMock; +import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.junit.Assert; import org.junit.Before; @@ -89,14 +90,14 @@ import java.util.Set; import java.util.stream.Collectors; -public class TaskLockboxTest +public class GlobalTaskLockboxTest { @Rule public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule(); private TaskStorage taskStorage; private IndexerMetadataStorageCoordinator metadataStorageCoordinator; - private TaskLockbox lockbox; + private GlobalTaskLockbox lockbox; private TaskLockboxValidator validator; private SegmentSchemaManager segmentSchemaManager; @@ -147,7 +148,9 @@ public void setup() CentralizedDatasourceSchemaConfig.create() ); - lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + lockbox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); + lockbox.syncFromStorage(); + validator = new TaskLockboxValidator(lockbox, taskStorage); } @@ -343,7 +346,9 @@ public void testTimeoutForLock() throws InterruptedException @Test public void testSyncFromStorage() { - final TaskLockbox originalBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final GlobalTaskLockbox originalBox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); + originalBox.syncFromStorage(); + for (int i = 0; i < 5; i++) { final Task task = NoopTask.create(); taskStorage.insert(task, TaskStatus.running(task.getId())); @@ -365,7 +370,7 @@ public void testSyncFromStorage() .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) .collect(Collectors.toList()); - final TaskLockbox newBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final GlobalTaskLockbox newBox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); newBox.syncFromStorage(); Assert.assertEquals(originalBox.getAllLocks(), newBox.getAllLocks()); @@ -392,7 +397,7 @@ public void testSyncFromStorageWithMissingTaskLockPriority() .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) .collect(Collectors.toList()); - final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final GlobalTaskLockbox lockbox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); lockbox.syncFromStorage(); final List afterLocksInStorage = taskStorage.getActiveTasks().stream() @@ -423,7 +428,7 @@ public void testSyncFromStorageWithMissingTaskPriority() .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) .collect(Collectors.toList()); - final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final GlobalTaskLockbox lockbox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); lockbox.syncFromStorage(); final List afterLocksInStorage = taskStorage.getActiveTasks().stream() @@ -450,7 +455,7 @@ public void testSyncFromStorageWithInvalidPriority() ) ); - final TaskLockbox lockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final GlobalTaskLockbox lockbox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); TaskLockboxSyncResult result = lockbox.syncFromStorage(); Assert.assertEquals(1, result.getTasksToFail().size()); Assert.assertTrue(result.getTasksToFail().contains(task)); @@ -489,8 +494,10 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() CentralizedDatasourceSchemaConfig.create() ); - TaskLockbox theBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); - TaskLockbox loadedBox = new TaskLockbox(loadedTaskStorage, loadedMetadataStorageCoordinator); + GlobalTaskLockbox theBox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); + theBox.syncFromStorage(); + GlobalTaskLockbox loadedBox = new GlobalTaskLockbox(loadedTaskStorage, loadedMetadataStorageCoordinator); + loadedBox.syncFromStorage(); Task aTask = NoopTask.create(); taskStorage.insert(aTask, TaskStatus.running(aTask.getId())); @@ -515,7 +522,8 @@ public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() @Test public void testRevokedLockSyncFromStorage() { - final TaskLockbox originalBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final GlobalTaskLockbox originalBox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); + originalBox.syncFromStorage(); final Task task1 = NoopTask.ofPriority(10); taskStorage.insert(task1, TaskStatus.running(task1.getId())); @@ -541,7 +549,7 @@ public void testRevokedLockSyncFromStorage() Assert.assertEquals(1, task2Locks.size()); Assert.assertTrue(task2Locks.get(0).isRevoked()); - final TaskLockbox newBox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + final GlobalTaskLockbox newBox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); newBox.syncFromStorage(); final Set afterLocksInStorage = taskStorage.getActiveTasks().stream() @@ -776,7 +784,7 @@ public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() ).isOk() ); - final Optional highLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval( + final Optional highLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval( highPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00") ); @@ -785,7 +793,7 @@ public void testFindLockPosseAfterRevokeWithDifferentLockIntervals() Assert.assertTrue(highLockPosse.get().containsTask(highPriorityTask)); Assert.assertFalse(highLockPosse.get().getTaskLock().isRevoked()); - final Optional lowLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval( + final Optional lowLockPosse = lockbox.getOnlyTaskLockPosseContainingInterval( lowPriorityTask, Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00") ); @@ -1114,9 +1122,9 @@ public void testLockPosseEquals() task2.getPriority() ); - TaskLockPosse taskLockPosse1 = new TaskLockPosse(taskLock1); - TaskLockPosse taskLockPosse2 = new TaskLockPosse(taskLock2); - TaskLockPosse taskLockPosse3 = new TaskLockPosse(taskLock1); + TaskLockbox.TaskLockPosse taskLockPosse1 = new TaskLockbox.TaskLockPosse(taskLock1); + TaskLockbox.TaskLockPosse taskLockPosse2 = new TaskLockbox.TaskLockPosse(taskLock2); + TaskLockbox.TaskLockPosse taskLockPosse3 = new TaskLockbox.TaskLockPosse(taskLock1); Assert.assertNotEquals(taskLockPosse1, null); Assert.assertNotEquals(null, taskLockPosse1); @@ -1147,7 +1155,7 @@ public void testGetTimeChunkAndSegmentLockForSameGroup() ).isOk() ); - final List posses = lockbox + final List posses = lockbox .getAllLocks() .get(task1.getDataSource()) .get(DateTimes.of("2017")) @@ -1871,36 +1879,28 @@ public void testFailedToReacquireTaskLock() // Please refer to NullLockPosseTaskLockbox final Task taskWithFailingLockAcquisition0 = new NoopTask(null, "FailingLockAcquisition", null, 0, 0, null); final Task taskWithFailingLockAcquisition1 = new NoopTask(null, "FailingLockAcquisition", null, 0, 0, null); - final Task taskWithSuccessfulLockAcquisition = NoopTask.create(); + final Task taskWithSuccessfulLockAcquisition = new NoopTask(null, "successGroup", "foo", 0L, 0L, Map.of()); taskStorage.insert(taskWithFailingLockAcquisition0, TaskStatus.running(taskWithFailingLockAcquisition0.getId())); taskStorage.insert(taskWithFailingLockAcquisition1, TaskStatus.running(taskWithFailingLockAcquisition1.getId())); taskStorage.insert(taskWithSuccessfulLockAcquisition, TaskStatus.running(taskWithSuccessfulLockAcquisition.getId())); + Assert.assertEquals(3, taskStorage.getActiveTasks().size()); - TaskLockbox testLockbox = new NullLockPosseTaskLockbox(taskStorage, metadataStorageCoordinator); - testLockbox.add(taskWithFailingLockAcquisition0); - testLockbox.add(taskWithFailingLockAcquisition1); - testLockbox.add(taskWithSuccessfulLockAcquisition); - - testLockbox.tryLock(taskWithFailingLockAcquisition0, - new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, - taskWithFailingLockAcquisition0, - Intervals.of("2017-07-01/2017-08-01"), - null - ) - ); - - testLockbox.tryLock(taskWithSuccessfulLockAcquisition, - new TimeChunkLockRequest(TaskLockType.EXCLUSIVE, - taskWithSuccessfulLockAcquisition, - Intervals.of("2017-07-01/2017-08-01"), - null - ) + final TaskLock lock = new TimeChunkLock( + null, + taskWithSuccessfulLockAcquisition.getGroupId(), + taskWithSuccessfulLockAcquisition.getDataSource(), + Intervals.of("2025-01-01/P1D"), + "v1", + taskWithSuccessfulLockAcquisition.getPriority() ); + taskStorage.addLock(taskWithSuccessfulLockAcquisition.getId(), lock); + taskStorage.addLock(taskWithFailingLockAcquisition0.getId(), lock); + taskStorage.addLock(taskWithFailingLockAcquisition1.getId(), lock); - Assert.assertEquals(3, taskStorage.getActiveTasks().size()); + GlobalTaskLockbox testLockbox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); + TaskLockboxSyncResult result = testLockbox.syncFromStorage(); // The tasks must be marked for failure - TaskLockboxSyncResult result = testLockbox.syncFromStorage(); Assert.assertEquals(ImmutableSet.of(taskWithFailingLockAcquisition0, taskWithFailingLockAcquisition1), result.getTasksToFail()); } @@ -2031,7 +2031,8 @@ public void testCleanupOnUnlock() .andReturn(0).once(); EasyMock.replay(coordinator); - final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator); + final GlobalTaskLockbox taskLockbox = new GlobalTaskLockbox(taskStorage, coordinator); + taskLockbox.syncFromStorage(); taskLockbox.add(replaceTask); taskLockbox.tryLock( @@ -2051,15 +2052,30 @@ public void testCleanupOnUnlock() EasyMock.verify(coordinator); } + @Test + public void test_add_throwsException_ifSyncIsNotComplete() + { + lockbox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); + MatcherAssert.assertThat( + Assert.assertThrows( + ISE.class, + () -> lockbox.add(NoopTask.create()) + ), + ExceptionMatcher.of(ISE.class).expectMessageIs( + "Cannot get TaskLockbox for datasource[none] as sync with storage has not happened yet." + ) + ); + } + private class TaskLockboxValidator { private final Set tasks; - private final TaskLockbox lockbox; + private final GlobalTaskLockbox lockbox; private final TaskStorage taskStorage; private final Map lockToTaskIdMap; - TaskLockboxValidator(TaskLockbox lockbox, TaskStorage taskStorage) + TaskLockboxValidator(GlobalTaskLockbox lockbox, TaskStorage taskStorage) { lockToTaskIdMap = new HashMap<>(); tasks = new HashSet<>(); @@ -2219,7 +2235,7 @@ public boolean isRevoked() } } - private static String TASK_NAME = "myModuleIsntLoadedTask"; + private static final String TASK_NAME = "myModuleIsntLoadedTask"; private static class TheModule extends SimpleModule { @@ -2231,7 +2247,7 @@ public TheModule() private static class MyModuleIsntLoadedTask extends AbstractTask { - private String someProp; + private final String someProp; @JsonCreator protected MyModuleIsntLoadedTask( @@ -2274,25 +2290,4 @@ public TaskStatus runTask(TaskToolbox toolbox) return TaskStatus.failure("how?", "Dummy task status err msg"); } } - - /** - * Extends TaskLockbox to return a null TaskLockPosse when the task's group name contains "FailingLockAcquisition". - */ - private static class NullLockPosseTaskLockbox extends TaskLockbox - { - public NullLockPosseTaskLockbox( - TaskStorage taskStorage, - IndexerMetadataStorageCoordinator metadataStorageCoordinator - ) - { - super(taskStorage, metadataStorageCoordinator); - } - - @Override - protected TaskLockPosse reacquireLockOnStartup(Task task, TaskLock taskLock) - { - return task.getGroupId() - .contains("FailingLockAcquisition") ? null : super.reacquireLockOnStartup(task, taskLock); - } - } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index fe91e1f2edc8..70e822337e0d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -234,7 +234,7 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private ObjectMapper mapper; private TaskQueryTool tsqa = null; private TaskStorage taskStorage = null; - private TaskLockbox taskLockbox = null; + private GlobalTaskLockbox taskLockbox = null; private TaskQueue taskQueue = null; private TaskRunner taskRunner = null; private TestIndexerMetadataStorageCoordinator mdc = null; @@ -549,7 +549,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( Preconditions.checkNotNull(taskStorage); Preconditions.checkNotNull(emitter); - taskLockbox = new TaskLockbox(taskStorage, mdc); + taskLockbox = new GlobalTaskLockbox(taskStorage, mdc); tac = new LocalTaskActionClientFactory( new TaskActionToolbox( taskLockbox, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java index a55117aad12d..fffa90bf732d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java @@ -62,7 +62,7 @@ public class TaskLockBoxConcurrencyTest private final ObjectMapper objectMapper = new DefaultObjectMapper(); private ExecutorService service; private TaskStorage taskStorage; - private TaskLockbox lockbox; + private GlobalTaskLockbox lockbox; private SegmentSchemaManager segmentSchemaManager; @Before @@ -81,7 +81,7 @@ public void setup() ); segmentSchemaManager = new SegmentSchemaManager(derby.metadataTablesConfigSupplier().get(), objectMapper, derbyConnector); - lockbox = new TaskLockbox( + lockbox = new GlobalTaskLockbox( taskStorage, new IndexerSQLMetadataStorageCoordinator( new SqlSegmentMetadataTransactionFactory( @@ -99,6 +99,7 @@ public void setup() CentralizedDatasourceSchemaConfig.create() ) ); + lockbox.syncFromStorage(); service = Execs.multiThreaded(2, "TaskLockBoxConcurrencyTest-%d"); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java index 69c2e1fe8cab..f257691fdd87 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java @@ -112,7 +112,7 @@ public boolean isForceTimeChunkLock() final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null, null, null, null); final TaskRunner taskRunner = EasyMock.createNiceMock(RemoteTaskRunner.class); final TaskActionClientFactory actionClientFactory = EasyMock.createNiceMock(LocalTaskActionClientFactory.class); - final TaskLockbox lockbox = new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()); + final GlobalTaskLockbox lockbox = new GlobalTaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()); final ServiceEmitter emitter = new NoopServiceEmitter(); return new TaskQueue( lockConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java index 72e0e5ad8559..447a51f44b11 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java @@ -138,7 +138,7 @@ public RetType submit(TaskAction taskAction) taskStorage, taskRunner, unsupportedTaskActionFactory, // Not used for anything serious - new TaskLockbox(taskStorage, storageCoordinator), + new GlobalTaskLockbox(taskStorage, storageCoordinator), new NoopServiceEmitter(), jsonMapper, new NoopTaskContextEnricher() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java index 040d54fea622..9c875a8bf1ba 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java @@ -544,7 +544,7 @@ public void testGetActiveTaskRedactsPassword() throws JsonProcessingException taskStorage, EasyMock.createMock(HttpRemoteTaskRunner.class), createActionClientFactory(), - new TaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()), + new GlobalTaskLockbox(taskStorage, new TestIndexerMetadataStorageCoordinator()), new StubServiceEmitter("druid/overlord", "testHost"), mapper, new NoopTaskContextEnricher() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java index 64a551fdee48..0f2c59686035 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/UnusedSegmentsKillerTest.java @@ -25,8 +25,8 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskMetrics; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; import org.apache.druid.indexing.test.TestDataSegmentKiller; import org.apache.druid.java.util.common.Intervals; @@ -421,7 +421,7 @@ public void test_run_skipsLockedIntervals() throws InterruptedException ); final Task ingestionTask = new NoopTask(null, null, TestDataSource.WIKI, 0L, 0L, null); - final TaskLockbox taskLockbox = taskActionTestKit.getTaskLockbox(); + final GlobalTaskLockbox taskLockbox = taskActionTestKit.getTaskLockbox(); try { taskLockbox.add(ingestionTask); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 2837f63e0590..afa67d41bea7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -43,8 +43,8 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.DruidOverlord; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskQueue; @@ -107,7 +107,7 @@ public class OverlordResourceTest private DruidOverlord overlord; private TaskMaster taskMaster; private TaskStorage taskStorage; - private TaskLockbox taskLockbox; + private GlobalTaskLockbox taskLockbox; private JacksonConfigManager configManager; private ProvisioningStrategy provisioningStrategy; private AuthConfig authConfig; @@ -133,7 +133,7 @@ public void setUp() overlord = EasyMock.createStrictMock(DruidOverlord.class); taskMaster = EasyMock.createStrictMock(TaskMaster.class); taskStorage = EasyMock.createStrictMock(TaskStorage.class); - taskLockbox = EasyMock.createStrictMock(TaskLockbox.class); + taskLockbox = EasyMock.createStrictMock(GlobalTaskLockbox.class); taskQueryTool = new TaskQueryTool( taskStorage, taskLockbox, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java index 0cddc0bd9663..77c032d08b4a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java @@ -49,10 +49,10 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.compact.CompactionScheduler; import org.apache.druid.indexing.overlord.DruidOverlord; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskRunner; @@ -112,7 +112,7 @@ public class OverlordTest private CuratorFramework curator; private DruidOverlord overlord; private TaskMaster taskMaster; - private TaskLockbox taskLockbox; + private GlobalTaskLockbox taskLockbox; private TaskStorage taskStorage; private TaskActionClientFactory taskActionClientFactory; private CountDownLatch announcementLatch; @@ -181,7 +181,7 @@ public void setUp() throws Exception IndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); - taskLockbox = new TaskLockbox(taskStorage, mdc); + taskLockbox = new GlobalTaskLockbox(taskStorage, mdc); task0 = NoopTask.create(); taskId0 = task0.getId(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index e0332708c3fe..3876c32ce185 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -68,10 +68,10 @@ import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.MetadataTaskStorage; import org.apache.druid.indexing.overlord.Segments; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.test.TestDataSegmentAnnouncer; @@ -201,7 +201,7 @@ public abstract class SeekableStreamIndexTaskTestBase extends EasyMockSupport protected File reportsFile; protected TaskToolboxFactory toolboxFactory; protected TaskStorage taskStorage; - protected TaskLockbox taskLockbox; + protected GlobalTaskLockbox taskLockbox; protected IndexerMetadataStorageCoordinator metadataStorageCoordinator; protected final Set checkpointRequestsHash = new HashSet<>(); protected SegmentSchemaManager segmentSchemaManager; @@ -607,7 +607,7 @@ protected void makeToolboxFactory(TestUtils testUtils, ServiceEmitter emitter, b segmentSchemaManager, CentralizedDatasourceSchemaConfig.create() ); - taskLockbox = new TaskLockbox(taskStorage, metadataStorageCoordinator); + taskLockbox = new GlobalTaskLockbox(taskStorage, metadataStorageCoordinator); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, taskStorage, diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index ef4901c68c35..e5af37f30fc6 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -72,11 +72,11 @@ import org.apache.druid.indexing.compact.OverlordCompactionScheduler; import org.apache.druid.indexing.overlord.DruidOverlord; import org.apache.druid.indexing.overlord.ForkingTaskRunnerFactory; +import org.apache.druid.indexing.overlord.GlobalTaskLockbox; import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.MetadataTaskStorage; import org.apache.druid.indexing.overlord.RemoteTaskRunnerFactory; -import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueryTool; import org.apache.druid.indexing.overlord.TaskRunnerFactory; @@ -239,7 +239,7 @@ public void configure(Binder binder) binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class); binder.bind(TaskActionToolbox.class).in(LazySingleton.class); - binder.bind(TaskLockbox.class).in(LazySingleton.class); + binder.bind(GlobalTaskLockbox.class).in(LazySingleton.class); binder.bind(TaskQueryTool.class).in(LazySingleton.class); binder.bind(IndexerMetadataStorageAdapter.class).in(LazySingleton.class); binder.bind(CompactionScheduler.class).to(OverlordCompactionScheduler.class).in(ManageLifecycle.class);