diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 207ff56f28f8..810a991c2f22 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -87,23 +87,38 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String final Supervisor supervisor = entry.getValue().lhs; final SupervisorSpec supervisorSpec = entry.getValue().rhs; - TaskLockType taskLockType = null; + boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; if (supervisorSpec instanceof SeekableStreamSupervisorSpec) { SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec; Map context = seekableStreamSupervisorSpec.getContext(); if (context != null) { - taskLockType = QueryContexts.getAsEnum( - Tasks.TASK_LOCK_TYPE, - context.get(Tasks.TASK_LOCK_TYPE), - TaskLockType.class + Boolean useConcurrentLocks = QueryContexts.getAsBoolean( + Tasks.USE_CONCURRENT_LOCKS, + context.get(Tasks.USE_CONCURRENT_LOCKS) ); + if (useConcurrentLocks == null) { + TaskLockType taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + context.get(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + if (taskLockType == null) { + hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; + } else if (taskLockType == TaskLockType.APPEND) { + hasAppendLock = true; + } else { + hasAppendLock = false; + } + } else { + hasAppendLock = useConcurrentLocks; + } } } if (supervisor instanceof SeekableStreamSupervisor && !supervisorSpec.isSuspended() && supervisorSpec.getDataSources().contains(datasource) - && TaskLockType.APPEND.equals(taskLockType)) { + && (hasAppendLock)) { return Optional.of(supervisorId); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index e8c5d839cf19..5ffbd4b94608 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -468,6 +468,21 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() EasyMock.replay(activeSpec); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + SeekableStreamSupervisorSpec activeSpecWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor activeSupervisorWithConcurrentLocks = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(activeSpecWithConcurrentLocks.getId()).andReturn("activeSpecWithConcurrentLocks").anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.getDataSources()) + .andReturn(ImmutableList.of("activeConcurrentLocksDS")).anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.createSupervisor()) + .andReturn(activeSupervisorWithConcurrentLocks).anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.createAutoscaler(activeSupervisorWithConcurrentLocks)) + .andReturn(null).anyTimes(); + EasyMock.expect(activeSpecWithConcurrentLocks.getContext()) + .andReturn(ImmutableMap.of(Tasks.USE_CONCURRENT_LOCKS, true)).anyTimes(); + EasyMock.replay(activeSpecWithConcurrentLocks); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes(); @@ -482,6 +497,25 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() EasyMock.replay(activeAppendSpec); metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + // A supervisor with useConcurrentLocks set to false explicitly must not use an append lock + SeekableStreamSupervisorSpec specWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor supervisorWithUseConcurrentLocksFalse = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(specWithUseConcurrentLocksFalse.getId()).andReturn("useConcurrentLocksFalse").anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.getDataSources()) + .andReturn(ImmutableList.of("dsWithuseConcurrentLocksFalse")).anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.createSupervisor()).andReturn(supervisorWithUseConcurrentLocksFalse).anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.createAutoscaler(supervisorWithUseConcurrentLocksFalse)) + .andReturn(null).anyTimes(); + EasyMock.expect(specWithUseConcurrentLocksFalse.getContext()).andReturn(ImmutableMap.of( + Tasks.USE_CONCURRENT_LOCKS, + false, + Tasks.TASK_LOCK_TYPE, + TaskLockType.APPEND.name() + )).anyTimes(); + EasyMock.replay(specWithUseConcurrentLocksFalse); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + replayAll(); manager.start(); @@ -499,6 +533,14 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() manager.createOrUpdateAndStartSupervisor(activeAppendSpec); Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent()); + manager.createOrUpdateAndStartSupervisor(activeSpecWithConcurrentLocks); + Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeConcurrentLocksDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(specWithUseConcurrentLocksFalse); + Assert.assertFalse( + manager.getActiveSupervisorIdForDatasourceWithAppendLock("dsWithUseConcurrentLocksFalse").isPresent() + ); + verifyAll(); }