diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 426f3a716190..d598fdd0910b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -433,6 +433,10 @@ public boolean registerUpgradedPendingSegmentOnSupervisor( /** * Checks if there is a Task distinct from the given {@code taskId} or its replicas * that is currently waiting to publish offsets for the given partitions. + * + * @return true only if the given {@param supervisorId} represents a + * {@link SeekableStreamSupervisor} and the supervisor has other tasks that + * are currently publishing offsets to an overlapping set of partitions. */ public boolean isAnotherTaskGroupPublishingToPartitions( String supervisorId, @@ -440,26 +444,23 @@ public boolean isAnotherTaskGroupPublishingToPartitions( DataSourceMetadata startMetadata ) { - try { - InvalidInput.conditionalException(supervisorId != null, "'supervisorId' cannot be null"); - if (!(startMetadata instanceof SeekableStreamDataSourceMetadata)) { - throw InvalidInput.exception( - "Start metadata[%s] of type[%s] is not valid streaming metadata", - supervisorId, startMetadata.getClass() - ); - } + InvalidInput.conditionalException(supervisorId != null, "'supervisorId' cannot be null"); + Pair supervisor = supervisors.get(supervisorId); + if (supervisor == null || supervisor.rhs == null) { + throw NotFound.exception("Could not find supervisor[%s]", supervisorId); + } + if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) { + return false; + } - Pair supervisor = supervisors.get(supervisorId); - if (supervisor == null || supervisor.rhs == null) { - throw NotFound.exception("Could not find supervisor[%s]", supervisorId); - } - if (!(supervisor.lhs instanceof SeekableStreamSupervisor)) { - throw InvalidInput.exception( - "Supervisor[%s] of type[%s] is not a streaming supervisor", - supervisorId, supervisor.rhs.getType() - ); - } + if (!(startMetadata instanceof SeekableStreamDataSourceMetadata)) { + throw InvalidInput.exception( + "Start metadata[%s] of type[%s] is not valid streaming metadata", + startMetadata, startMetadata == null ? null : startMetadata.getClass() + ); + } + try { final Set partitionIds = Set.copyOf( ((SeekableStreamDataSourceMetadata) startMetadata) .getSeekableStreamSequenceNumbers() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index dbe79b857809..d257c1e4e8cb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3580,7 +3580,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException * during a task rollover based on the recommendations from the task auto-scaler. */ @VisibleForTesting - void maybeScaleDuringTaskRollover() throws ExecutionException, InterruptedException + void maybeScaleDuringTaskRollover() { if (taskAutoScaler == null) { // Do nothing @@ -3594,11 +3594,16 @@ void maybeScaleDuringTaskRollover() throws ExecutionException, InterruptedExcept supervisorId, currentTaskCount, rolloverTaskCount ); isScalingTasksOnRollover.set(true); - new DynamicAllocationTasksNotice( - () -> rolloverTaskCount, - () -> isScalingTasksOnRollover.set(false), - emitter - ).handle(); + try { + new DynamicAllocationTasksNotice( + () -> rolloverTaskCount, + () -> {}, + emitter + ).handle(); + } + finally { + isScalingTasksOnRollover.set(false); + } } } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 4377452ec5d2..217aae3f2580 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.TimeChunkLockRequest; +import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.LinearShardSpec; @@ -42,6 +43,8 @@ import org.junit.Rule; import org.junit.Test; +import java.util.List; + public class SegmentTransactionalInsertActionTest { @Rule @@ -190,6 +193,9 @@ public void test_fail_transactionalUpdateDataSourceMetadata() throws Exception actionTestKit.getTaskLockbox().add(task); acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); + final NoopSupervisorSpec supervisorSpec = new NoopSupervisorSpec(SUPERVISOR_ID, List.of(DATA_SOURCE)); + actionTestKit.getSupervisorManager().createOrUpdateAndStartSupervisor(supervisorSpec); + SegmentPublishResult result = SegmentTransactionalInsertAction.appendAction( ImmutableSet.of(SEGMENT1), SUPERVISOR_ID, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java index 26cbe3bbfa72..b6076f26e771 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskActionTestKit.java @@ -36,6 +36,7 @@ import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig; +import org.apache.druid.metadata.SQLMetadataSupervisorManager; import org.apache.druid.metadata.SegmentsMetadataManagerConfig; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory; @@ -64,6 +65,7 @@ public class TaskActionTestKit extends ExternalResource private TestDerbyConnector testDerbyConnector; private IndexerMetadataStorageCoordinator metadataStorageCoordinator; private TaskActionToolbox taskActionToolbox; + private SupervisorManager supervisorManager; private SegmentMetadataCache segmentMetadataCache; private BlockingExecutorService metadataCachePollExec; @@ -144,6 +146,11 @@ public TaskActionToolbox getTaskActionToolbox() return taskActionToolbox; } + public SupervisorManager getSupervisorManager() + { + return supervisorManager; + } + public void syncSegmentMetadataCache() { metadataCachePollExec.finishNextPendingTasks(4); @@ -200,7 +207,14 @@ public boolean isBatchAllocationReduceMetadataIO() } }; - SupervisorManager supervisorManager = new SupervisorManager(objectMapper, null); + this.supervisorManager = new SupervisorManager( + objectMapper, + new SQLMetadataSupervisorManager( + objectMapper, + testDerbyConnector, + () -> testDerbyConnector.getMetadataTablesConfig() + ) + ); SegmentAllocationQueue segmentAllocationQueue = new SegmentAllocationQueue( taskLockbox, taskLockConfig, @@ -230,7 +244,9 @@ public boolean isBatchAllocationReduceMetadataIO() testDerbyConnector.createTaskTables(); testDerbyConnector.createAuditTable(); testDerbyConnector.createIndexingStatesTable(); + testDerbyConnector.createSupervisorsTable(); + supervisorManager.start(); segmentMetadataCache.start(); segmentMetadataCache.becomeLeader(); syncSegmentMetadataCache(); @@ -289,6 +305,7 @@ public void after() taskActionToolbox = null; segmentMetadataCache.stopBeingLeader(); segmentMetadataCache.stop(); + supervisorManager.stop(); useSegmentMetadataCache = false; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 7794a798473b..d34a29360017 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -25,12 +25,14 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.SettableFuture; +import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; @@ -796,6 +798,154 @@ public void testRegisterUpgradedPendingSegmentOnSupervisor() verifyAll(); } + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorIdIsNull() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> manager.isAnotherTaskGroupPublishingToPartitions(null, "task1", null) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs("'supervisorId' cannot be null") + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorNotFound() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", null) + ), + DruidExceptionMatcher.notFound().expectMessageIs("Could not find supervisor[supervisor1]") + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_returnsFalse_forNonStreamingSupervisor() + { + final String supervisorId = "supervisor1"; + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn( + Map.of(supervisorId, new TestSupervisorSpec(supervisorId, supervisor1)) + ); + + supervisor1.start(); + EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + + manager.start(); + + Assert.assertFalse( + manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", null) + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsNull() + { + final String supervisorId = "supervisor1"; + final SeekableStreamSupervisor seekableStreamSupervisor = + EasyMock.createMock(SeekableStreamSupervisor.class); + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn( + Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor)) + ); + + seekableStreamSupervisor.start(); + EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + EasyMock.replay(seekableStreamSupervisor); + + manager.start(); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", null) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Start metadata[null] of type[null] is not valid streaming metadata" + ) + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsInvalid() + { + final String supervisorId = "supervisor1"; + final SeekableStreamSupervisor seekableStreamSupervisor = + EasyMock.createMock(SeekableStreamSupervisor.class); + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn( + Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor)) + ); + + seekableStreamSupervisor.start(); + EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + replayAll(); + EasyMock.replay(seekableStreamSupervisor); + + manager.start(); + + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", new ObjectMetadata("abc")) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Start metadata[ObjectMetadata{theObject=abc}] of" + + " type[class org.apache.druid.indexing.overlord.ObjectMetadata]" + + " is not valid streaming metadata" + ) + ); + } + + @Test + public void test_isAnotherTaskGroupPublishingToPartitions() + { + final String supervisorId = "supervisor1"; + final SeekableStreamSupervisor seekableStreamSupervisor = + EasyMock.createMock(SeekableStreamSupervisor.class); + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn( + Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor)) + ); + + seekableStreamSupervisor.start(); + EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes(); + + // Expect a readyTaskId for which no other group is currently publishing + final String readyTaskId = "task1"; + EasyMock.expect( + seekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions( + EasyMock.eq(readyTaskId), + EasyMock.anyObject() + ) + ).andReturn(false).atLeastOnce(); + + // Expect a conflictingTaskId for which another group is currently publishing + final String conflictingTaskId = "task2"; + EasyMock.expect( + seekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions( + EasyMock.eq(conflictingTaskId), + EasyMock.anyObject() + ) + ).andReturn(true).atLeastOnce(); + + replayAll(); + EasyMock.replay(seekableStreamSupervisor); + manager.start(); + + final DataSourceMetadata startMetadata = TestSeekableStreamDataSourceMetadata.start( + "topic", + Map.of("0", "10") + ); + Assert.assertTrue( + manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, conflictingTaskId, startMetadata) + ); + Assert.assertFalse( + manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, readyTaskId, startMetadata) + ); + } + private static class TestSupervisorSpec implements SupervisorSpec { private final String id; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java index 7d87427e45c7..fe72fffc160b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java @@ -23,8 +23,24 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import java.util.Map; + public class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata { + public static TestSeekableStreamDataSourceMetadata start(String topic, Map partitionOffsets) + { + return new TestSeekableStreamDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>(topic, partitionOffsets, null) + ); + } + + public static TestSeekableStreamDataSourceMetadata end(String topic, Map partitionOffsets) + { + return new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>(topic, partitionOffsets) + ); + } + @JsonCreator public TestSeekableStreamDataSourceMetadata( @JsonProperty("partitions") SeekableStreamSequenceNumbers seekableStreamSequenceNumbers) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java index 0bf697a2a094..01882f1e8211 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java @@ -48,7 +48,7 @@ public void setup() } @Test - public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() throws Exception + public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() { // Given setupSpecExpectations(createIOConfig(DEFAULT_TASK_COUNT, null)); @@ -68,7 +68,7 @@ public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() throws } @Test - public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale() throws Exception + public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotScale() { // Given setupSpecExpectations(getIOConfigWithCostBasedAutoScaler()); @@ -96,7 +96,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotSc } @Test - public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling() throws Exception + public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScaling() { // Given final int targetTaskCount = 5; @@ -127,7 +127,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScal } @Test - public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() throws Exception + public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() { // Given setupSpecExpectations(getIOConfigWithCostBasedAutoScaler()); diff --git a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java index 67e96b86d747..638359ed1359 100644 --- a/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java +++ b/processing/src/test/java/org/apache/druid/error/DruidExceptionMatcher.java @@ -40,6 +40,15 @@ public static DruidExceptionMatcher invalidInput() ); } + public static DruidExceptionMatcher notFound() + { + return new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.NOT_FOUND, + "notFound" + ); + } + public static DruidExceptionMatcher unsupported() { return new DruidExceptionMatcher( diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java index 7a8d01042484..64ef43534dbe 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/ObjectMetadata.java @@ -24,6 +24,9 @@ import java.util.Objects; +/** + * {@link DataSourceMetadata} that contains a raw {@link Object}. Used in tests. + */ public final class ObjectMetadata implements DataSourceMetadata { private final Object theObject;