From 74e7b091c86d99eee484efbb8d583d6f7be39d68 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 22 Apr 2024 18:05:12 +0530 Subject: [PATCH 1/2] Ignore append locks for compaction when using concurrent locks --- .../druid/indexing/overlord/TaskLockbox.java | 7 ++++- .../indexing/overlord/TaskLockboxTest.java | 28 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 35ec79d74ec7..2c1dc852a0bf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -961,7 +961,12 @@ public Map> getLockedIntervals(List loc final int priority = lockFilter.getPriority(); final boolean ignoreAppendLocks = - TaskLockType.REPLACE.name().equals(lockFilter.getContext().get(Tasks.TASK_LOCK_TYPE)); + Boolean.TRUE.equals(lockFilter.getContext().getOrDefault( + Tasks.USE_CONCURRENT_LOCKS, + Tasks.DEFAULT_USE_CONCURRENT_LOCKS)) + || TaskLockType.REPLACE.name().equals(lockFilter.getContext().getOrDefault( + Tasks.TASK_LOCK_TYPE, + Tasks.DEFAULT_TASK_LOCK_TYPE)); running.get(datasource).forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 999d4d0abb2e..ab4bf3a504fc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1325,6 +1325,34 @@ public void testGetLockedIntervalsForLowerPriorityReplaceLock() Assert.assertTrue(conflictingIntervals.isEmpty()); } + @Test + public void testGetLockedIntervalsForLowerPriorityUseConcurrentLocks() + { + final Task task = NoopTask.ofPriority(50); + lockbox.add(task); + taskStorage.insert(task, TaskStatus.running(task.getId())); + tryTimeChunkLock( + TaskLockType.APPEND, + task, + Intervals.of("2017/2018") + ); + + LockFilterPolicy requestForReplaceLowerPriorityLock = new LockFilterPolicy( + task.getDataSource(), + 25, + ImmutableMap.of( + Tasks.TASK_LOCK_TYPE, + TaskLockType.EXCLUSIVE.name(), + Tasks.USE_CONCURRENT_LOCKS, + true + ) + ); + + Map> conflictingIntervals = + lockbox.getLockedIntervals(ImmutableList.of(requestForReplaceLowerPriorityLock)); + Assert.assertTrue(conflictingIntervals.isEmpty()); + } + @Test public void testExclusiveLockCompatibility() From b22bf7cb287b46a7784e9486f33b2cbb719f2060 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 22 Apr 2024 21:02:01 +0530 Subject: [PATCH 2/2] Better formatting --- .../druid/indexing/overlord/TaskLockbox.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 2c1dc852a0bf..7248fcab865e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -960,13 +960,19 @@ public Map> getLockedIntervals(List loc } final int priority = lockFilter.getPriority(); - final boolean ignoreAppendLocks = - Boolean.TRUE.equals(lockFilter.getContext().getOrDefault( - Tasks.USE_CONCURRENT_LOCKS, - Tasks.DEFAULT_USE_CONCURRENT_LOCKS)) - || TaskLockType.REPLACE.name().equals(lockFilter.getContext().getOrDefault( + final boolean isReplaceLock = TaskLockType.REPLACE.name().equals( + lockFilter.getContext().getOrDefault( Tasks.TASK_LOCK_TYPE, - Tasks.DEFAULT_TASK_LOCK_TYPE)); + Tasks.DEFAULT_TASK_LOCK_TYPE + ) + ); + final boolean isUsingConcurrentLocks = Boolean.TRUE.equals( + lockFilter.getContext().getOrDefault( + Tasks.USE_CONCURRENT_LOCKS, + Tasks.DEFAULT_USE_CONCURRENT_LOCKS + ) + ); + final boolean ignoreAppendLocks = isUsingConcurrentLocks || isReplaceLock; running.get(datasource).forEach( (startTime, startTimeLocks) -> startTimeLocks.forEach(