From c9638268ec88c027bce8c441bda8e34e37e714af Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 10 Mar 2026 11:42:12 +0530 Subject: [PATCH 1/5] Add minor fixes to follow up #19091 --- .../supervisor/SupervisorManager.java | 36 +++--- .../supervisor/SupervisorManagerTest.java | 105 ++++++++++++++++++ .../druid/error/DruidExceptionMatcher.java | 9 ++ 3 files changed, 132 insertions(+), 18 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 426f3a716190..17be531e7102 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -440,26 +440,26 @@ public boolean isAnotherTaskGroupPublishingToPartitions( DataSourceMetadata startMetadata ) { - try { - InvalidInput.conditionalException(supervisorId != null, "'supervisorId' cannot be null"); - if (!(startMetadata instanceof SeekableStreamDataSourceMetadata)) { - throw InvalidInput.exception( - "Start metadata[%s] of type[%s] is not valid streaming metadata", - supervisorId, startMetadata.getClass() - ); - } + InvalidInput.conditionalException(supervisorId != null, "'supervisorId' cannot be null"); + if (!(startMetadata instanceof SeekableStreamDataSourceMetadata)) { + throw InvalidInput.exception( + "Start metadata[%s] of type[%s] is not valid streaming metadata", + startMetadata, startMetadata == null ? null : startMetadata.getClass() + ); + } - Pair supervisor = supervisors.get(supervisorId); - if (supervisor == null || supervisor.rhs == null) { - throw NotFound.exception("Could not find supervisor[%s]", supervisorId); - } - if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) { - throw InvalidInput.exception( - "Supervisor[%s] of type[%s] is not a streaming supervisor", - supervisorId, supervisor.rhs.getType() - ); - } + Pair supervisor = supervisors.get(supervisorId); + if (supervisor == null || supervisor.rhs == null) { + throw NotFound.exception("Could not find supervisor[%s]", supervisorId); + } + if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) { + throw InvalidInput.exception( + "Supervisor[%s] of type[%s] is not a streaming supervisor", + supervisorId, supervisor.rhs.getType() + ); + } + try { final Set partitionIds = Set.copyOf( ((SeekableStreamDataSourceMetadata) startMetadata) .getSeekableStreamSequenceNumbers() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 7794a798473b..a0d5b1cec554 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -31,6 +31,7 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; @@ -59,6 +60,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; @RunWith(EasyMockRunner.class) public class SupervisorManagerTest extends EasyMockSupport @@ -796,6 +798,109 @@ public void testRegisterUpgradedPendingSegmentOnSupervisor() verifyAll(); } + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorIdIsNull() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> manager.isAnotherTaskGroupPublishingToPartitions(null, "task1", null) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs("'supervisorId' cannot be null") + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorNotFound() + { + final DataSourceMetadata startMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + "topic", + Map.of("0", "10", "1", "20"), + Set.of() + ) + ); + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", startMetadata) + ), + DruidExceptionMatcher.notFound().expectMessageIs("Could not find supervisor[supervisor1]") + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorTypeIsInvalid() + { + final DataSourceMetadata startMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + "topic", + Map.of("0", "10", "1", "20"), + Set.of() + ) + ); + + final String supervisorId = "supervisor1"; + final SupervisorSpec supervisorSpec = new TestSupervisorSpec(supervisorId, supervisor1); + Map existingSpecs = ImmutableMap.of( + supervisorId, new TestSupervisorSpec(supervisorId, supervisor1) + ); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + metadataSupervisorManager.insert(supervisorId, supervisorSpec); + supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + + manager.start(); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", startMetadata) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Supervisor[supervisor1] of type[TestSupervisorSpec] is not a streaming supervisor" + ) + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsNull() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", null) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Start metadata[null] of type[null] is not valid streaming metadata" + ) + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsInvalid() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", new ObjectMetadata("abc")) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Start metadata[ObjectMetadata{theObject=abc}] of" + + " type[class org.apache.druid.indexing.overlord.ObjectMetadata]" + + " is not valid streaming metadata" + ) + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions() + { + + } + private static class TestSupervisorSpec implements SupervisorSpec { private final String id; diff --git a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java index 67e96b86d747..638359ed1359 100644 --- a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java +++ b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java @@ -40,6 +40,15 @@ public static DruidExceptionMatcher invalidInput() ); } + public static DruidExceptionMatcher notFound() + { + return new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.NOT_FOUND, + "notFound" + ); + } + public static DruidExceptionMatcher unsupported() { return new DruidExceptionMatcher( From 5b04fa6ed0ce78d173cbd30296336f374788c16d Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 10 Mar 2026 11:54:39 +0530 Subject: [PATCH 2/5] Reset state if scaling fails --- .../supervisor/SeekableStreamSupervisor.java | 16 ++++++++++------ ...eamSupervisorScaleDuringTaskRolloverTest.java | 8 ++++---- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index dbe79b857809..4dd7a1dd6447 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3580,7 +3580,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException * during a task rollover based on the recommendations from the task auto-scaler. */ @VisibleForTesting - void maybeScaleDuringTaskRollover() throws ExecutionException, InterruptedException + void maybeScaleDuringTaskRollover() { if (taskAutoScaler == null) { // Do nothing @@ -3594,11 +3594,15 @@ void maybeScaleDuringTaskRollover() throws ExecutionException, InterruptedExcept supervisorId, currentTaskCount, rolloverTaskCount ); isScalingTasksOnRollover.set(true); - new DynamicAllocationTasksNotice( - () -> rolloverTaskCount, - () -> isScalingTasksOnRollover.set(false), - emitter - ).handle(); + try { + new DynamicAllocationTasksNotice( + () -> rolloverTaskCount, + () -> {}, + emitter + ).handle(); + } finally { + isScalingTasksOnRollover.set(false); + } } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java index 0bf697a2a094..01882f1e8211 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java @@ -48,7 +48,7 @@ public void setup() } @Test - public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() throws Exception + public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() { // Given setupSpecExpectations(createIOConfig(DEFAULT_TASK_COUNT, null)); @@ -68,7 +68,7 @@ public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() throws } @Test - public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale() throws Exception + public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale() { // Given setupSpecExpectations(getIOConfigWithCostBasedAutoScaler()); @@ -96,7 +96,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotSc } @Test - public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling() throws Exception + public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling() { // Given final int targetTaskCount = 5; @@ -127,7 +127,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScal } @Test - public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() throws Exception + public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() { // Given setupSpecExpectations(getIOConfigWithCostBasedAutoScaler()); From 0dcc58b4fd2342a82fe3a3cbfffce8817a78bcb0 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 10 Mar 2026 12:10:26 +0530 Subject: [PATCH 3/5] Add anotehr test --- .../supervisor/SupervisorManagerTest.java | 48 +++++++++++++++++-- 1 file changed, 43 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index a0d5b1cec554..9c2e71169aa6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.SettableFuture; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.InvalidInput; @@ -841,13 +842,10 @@ public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupe ); final String supervisorId = "supervisor1"; - final SupervisorSpec supervisorSpec = new TestSupervisorSpec(supervisorId, supervisor1); - Map existingSpecs = ImmutableMap.of( - supervisorId, new TestSupervisorSpec(supervisorId, supervisor1) + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn( + Map.of(supervisorId, new TestSupervisorSpec(supervisorId, supervisor1)) ); - EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); - metadataSupervisorManager.insert(supervisorId, supervisorSpec); supervisor1.start(); EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); replayAll(); @@ -898,7 +896,47 @@ public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMeta @Test public void test_isAnotherTaskGroupPublishingToPartitions() { + final String supervisorId = "supervisor1"; + final SeekableStreamSupervisor seekableStreamSupervisor = + EasyMock.createMock(SeekableStreamSupervisor.class); + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn( + Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor)) + ); + + seekableStreamSupervisor.start(); + EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + + // Expect a readyTaskId for which no other group is currently publishing + final String readyTaskId = "task1"; + EasyMock.expect( + seekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions( + EasyMock.eq(readyTaskId), + EasyMock.anyObject() + ) + ).andReturn(false).atLeastOnce(); + + // Expect a conflictingTaskId for which another group is currently publishing + final String conflictingTaskId = "task2"; + EasyMock.expect( + seekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions( + EasyMock.eq(conflictingTaskId), + EasyMock.anyObject() + ) + ).andReturn(true).atLeastOnce(); + replayAll(); + EasyMock.replay(seekableStreamSupervisor); + manager.start(); + + final DataSourceMetadata startMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>("topic", Map.of("0", "10"), Set.of()) + ); + Assert.assertTrue( + manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, conflictingTaskId, startMetadata) + ); + Assert.assertFalse( + manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, readyTaskId, startMetadata) + ); } private static class TestSupervisorSpec implements SupervisorSpec From 643bde38d0bb5d98a55be1a844224cd7c8d59bd1 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 10 Mar 2026 14:21:35 +0530 Subject: [PATCH 4/5] Fixes --- .../supervisor/SupervisorManager.java | 19 +++--- .../supervisor/SeekableStreamSupervisor.java | 3 +- .../druid/indexing/common/TestUtils.java | 4 +- .../SegmentTransactionalInsertActionTest.java | 6 ++ .../common/actions/TaskActionTestKit.java | 19 +++++- .../supervisor/SupervisorManagerTest.java | 67 ++++++++++--------- .../TestSeekableStreamDataSourceMetadata.java | 16 +++++ .../indexing/overlord/ObjectMetadata.java | 3 + 8 files changed, 95 insertions(+), 42 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 17be531e7102..d598fdd0910b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -433,6 +433,10 @@ public boolean registerUpgradedPendingSegmentOnSupervisor( /** * Checks if there is a Task distinct from the given {@code taskId} or its replicas * that is currently waiting to publish offsets for the given partitions. + * + * @return true only if the given {@param supervisorId} represents a + * {@link SeekableStreamSupervisor} and the supervisor has other tasks that + * are currently publishing offsets to an overlapping set of partitions. */ public boolean isAnotherTaskGroupPublishingToPartitions( String supervisorId, @@ -441,21 +445,18 @@ public boolean isAnotherTaskGroupPublishingToPartitions( ) { InvalidInput.conditionalException(supervisorId != null, "'supervisorId' cannot be null"); - if (!(startMetadata instanceof SeekableStreamDataSourceMetadata)) { - throw InvalidInput.exception( - "Start metadata[%s] of type[%s] is not valid streaming metadata", - startMetadata, startMetadata == null ? null : startMetadata.getClass() - ); - } - Pair supervisor = supervisors.get(supervisorId); if (supervisor == null || supervisor.rhs == null) { throw NotFound.exception("Could not find supervisor[%s]", supervisorId); } if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) { + return false; + } + + if (!(startMetadata instanceof SeekableStreamDataSourceMetadata)) { throw InvalidInput.exception( - "Supervisor[%s] of type[%s] is not a streaming supervisor", - supervisorId, supervisor.rhs.getType() + "Start metadata[%s] of type[%s] is not valid streaming metadata", + startMetadata, startMetadata == null ? null : startMetadata.getClass() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 4dd7a1dd6447..d257c1e4e8cb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3600,7 +3600,8 @@ void maybeScaleDuringTaskRollover() () -> {}, emitter ).handle(); - } finally { + } + finally { isScalingTasksOnRollover.set(false); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java index 6481a5aecfbc..8778058ef4c7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java @@ -30,6 +30,7 @@ import org.apache.druid.guice.DruidSecondaryModule; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -114,7 +115,8 @@ public void setupModule(SetupContext context) context.registerSubtypes( new NamedType(LocalLoadSpec.class, "local"), new NamedType(NoopInputSource.class, "noop"), - new NamedType(NoopInputFormat.class, "noop") + new NamedType(NoopInputFormat.class, "noop"), + new NamedType(TestSeekableStreamDataSourceMetadata.class, "testStream") ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 4377452ec5d2..217aae3f2580 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; +import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -42,6 +43,8 @@ import org.junit.Rule; import org.junit.Test; +import java.util.List; + public class SegmentTransactionalInsertActionTest { @Rule @@ -190,6 +193,9 @@ public void test_fail_transactionalUpdateDataSourceMetadata() throws Exception actionTestKit.getTaskLockbox().add(task); acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); + final NoopSupervisorSpec supervisorSpec = new NoopSupervisorSpec(SUPERVISOR_ID, List.of(DATA_SOURCE)); + actionTestKit.getSupervisorManager().createOrUpdateAndStartSupervisor(supervisorSpec); + SegmentPublishResult result = SegmentTransactionalInsertAction.appendAction( ImmutableSet.of(SEGMENT1), SUPERVISOR_ID, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 26cbe3bbfa72..b6076f26e771 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -36,6 +36,7 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataSupervisorManager; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory; @@ -64,6 +65,7 @@ public class TaskActionTestKit extends ExternalResource private TestDerbyConnector testDerbyConnector; private IndexerMetadataStorageCoordinator metadataStorageCoordinator; private TaskActionToolbox taskActionToolbox; + private SupervisorManager supervisorManager; private SegmentMetadataCache segmentMetadataCache; private BlockingExecutorService metadataCachePollExec; @@ -144,6 +146,11 @@ public TaskActionToolbox getTaskActionToolbox() return taskActionToolbox; } + public SupervisorManager getSupervisorManager() + { + return supervisorManager; + } + public void syncSegmentMetadataCache() { metadataCachePollExec.finishNextPendingTasks(4); @@ -200,7 +207,14 @@ public boolean isBatchAllocationReduceMetadataIO() } }; - SupervisorManager supervisorManager = new SupervisorManager(objectMapper, null); + this.supervisorManager = new SupervisorManager( + objectMapper, + new SQLMetadataSupervisorManager( + objectMapper, + testDerbyConnector, + () -> testDerbyConnector.getMetadataTablesConfig() + ) + ); SegmentAllocationQueue segmentAllocationQueue = new SegmentAllocationQueue( taskLockbox, taskLockConfig, @@ -230,7 +244,9 @@ public boolean isBatchAllocationReduceMetadataIO() testDerbyConnector.createTaskTables(); testDerbyConnector.createAuditTable(); testDerbyConnector.createIndexingStatesTable(); + testDerbyConnector.createSupervisorsTable(); + supervisorManager.start(); segmentMetadataCache.start(); segmentMetadataCache.becomeLeader(); syncSegmentMetadataCache(); @@ -289,6 +305,7 @@ public void after() taskActionToolbox = null; segmentMetadataCache.stopBeingLeader(); segmentMetadataCache.stop(); + supervisorManager.stop(); useSegmentMetadataCache = false; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 9c2e71169aa6..d34a29360017 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -61,7 +61,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; @RunWith(EasyMockRunner.class) public class SupervisorManagerTest extends EasyMockSupport @@ -814,33 +813,18 @@ public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupe @Test public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorNotFound() { - final DataSourceMetadata startMetadata = new TestSeekableStreamDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>( - "topic", - Map.of("0", "10", "1", "20"), - Set.of() - ) - ); MatcherAssert.assertThat( Assert.assertThrows( DruidException.class, - () -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", startMetadata) + () -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", null) ), DruidExceptionMatcher.notFound().expectMessageIs("Could not find supervisor[supervisor1]") ); } @Test - public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorTypeIsInvalid() + public void test_isAnotherTaskGroupPublishingToPartitions_returnsFalse_forNonStreamingSupervisor() { - final DataSourceMetadata startMetadata = new TestSeekableStreamDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>( - "topic", - Map.of("0", "10", "1", "20"), - Set.of() - ) - ); - final String supervisorId = "supervisor1"; EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn( Map.of(supervisorId, new TestSupervisorSpec(supervisorId, supervisor1)) @@ -852,24 +836,32 @@ public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupe manager.start(); - MatcherAssert.assertThat( - Assert.assertThrows( - DruidException.class, - () -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", startMetadata) - ), - DruidExceptionMatcher.invalidInput().expectMessageIs( - "Supervisor[supervisor1] of type[TestSupervisorSpec] is not a streaming supervisor" - ) + Assert.assertFalse( + manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", null) ); } @Test public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsNull() { + final String supervisorId = "supervisor1"; + final SeekableStreamSupervisor seekableStreamSupervisor = + EasyMock.createMock(SeekableStreamSupervisor.class); + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn( + Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor)) + ); + + seekableStreamSupervisor.start(); + EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + EasyMock.replay(seekableStreamSupervisor); + + manager.start(); + MatcherAssert.assertThat( Assert.assertThrows( DruidException.class, - () -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", null) + () -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", null) ), DruidExceptionMatcher.invalidInput().expectMessageIs( "Start metadata[null] of type[null] is not valid streaming metadata" @@ -880,10 +872,24 @@ public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMeta @Test public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsInvalid() { + final String supervisorId = "supervisor1"; + final SeekableStreamSupervisor seekableStreamSupervisor = + EasyMock.createMock(SeekableStreamSupervisor.class); + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn( + Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor)) + ); + + seekableStreamSupervisor.start(); + EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + EasyMock.replay(seekableStreamSupervisor); + + manager.start(); + MatcherAssert.assertThat( Assert.assertThrows( DruidException.class, - () -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", new ObjectMetadata("abc")) + () -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", new ObjectMetadata("abc")) ), DruidExceptionMatcher.invalidInput().expectMessageIs( "Start metadata[ObjectMetadata{theObject=abc}] of" @@ -928,8 +934,9 @@ public void test_isAnotherTaskGroupPublishingToPartitions() EasyMock.replay(seekableStreamSupervisor); manager.start(); - final DataSourceMetadata startMetadata = new TestSeekableStreamDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>("topic", Map.of("0", "10"), Set.of()) + final DataSourceMetadata startMetadata = TestSeekableStreamDataSourceMetadata.start( + "topic", + Map.of("0", "10") ); Assert.assertTrue( manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, conflictingTaskId, startMetadata) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java index 7d87427e45c7..fe72fffc160b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java @@ -23,8 +23,24 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import java.util.Map; + public class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata { + public static TestSeekableStreamDataSourceMetadata start(String topic, Map partitionOffsets) + { + return new TestSeekableStreamDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>(topic, partitionOffsets, null) + ); + } + + public static TestSeekableStreamDataSourceMetadata end(String topic, Map partitionOffsets) + { + return new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>(topic, partitionOffsets) + ); + } + @JsonCreator public TestSeekableStreamDataSourceMetadata( @JsonProperty("partitions") SeekableStreamSequenceNumbers seekableStreamSequenceNumbers) diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java index 7a8d01042484..64ef43534dbe 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java @@ -24,6 +24,9 @@ import java.util.Objects; +/** + * {@link DataSourceMetadata} that contains a raw {@link Object}. Used in tests. + */ public final class ObjectMetadata implements DataSourceMetadata { private final Object theObject; From 6e0295bd238a700d34b62a643fdf1d05e2f4b847 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 10 Mar 2026 14:24:31 +0530 Subject: [PATCH 5/5] Remove extra subtype --- .../test/java/org/apache/druid/indexing/common/TestUtils.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java index 8778058ef4c7..6481a5aecfbc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java @@ -30,7 +30,6 @@ import org.apache.druid.guice.DruidSecondaryModule; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; import org.apache.druid.indexing.common.task.TestAppenderatorsManager; -import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -115,8 +114,7 @@ public void setupModule(SetupContext context) context.registerSubtypes( new NamedType(LocalLoadSpec.class, "local"), new NamedType(NoopInputSource.class, "noop"), - new NamedType(NoopInputFormat.class, "noop"), - new NamedType(TestSeekableStreamDataSourceMetadata.class, "testStream") + new NamedType(NoopInputFormat.class, "noop") ); } }