From dd5a1d123cc14699c2ac6f58de773954574c8e4c Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Mon, 16 Jun 2025 23:40:31 -0700 Subject: [PATCH 1/7] Add follow-ups to multi stream supervisor, single datasource ingests --- docs/ingestion/supervisor.md | 8 +++---- .../SegmentTransactionalAppendAction.java | 10 +++----- .../SegmentTransactionalInsertAction.java | 9 ++----- .../supervisor/SupervisorResource.java | 2 +- .../SeekableStreamIndexTask.java | 4 ++++ .../SeekableStreamIndexTaskRunner.java | 7 +----- .../supervisor/SeekableStreamSupervisor.java | 4 ++-- .../SeekableStreamSupervisorSpec.java | 4 ++++ .../indexer/AbstractStreamIndexingTest.java | 12 ++++------ .../IndexerMetadataStorageCoordinator.java | 21 ++++++++++++++-- .../IndexerSQLMetadataStorageCoordinator.java | 24 ++++--------------- .../druid/metadata/SQLMetadataConnector.java | 7 +++--- 12 files changed, 53 insertions(+), 59 deletions(-) diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index 5db3189c0667..b5b38df66dce 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -37,7 +37,7 @@ The following table outlines the high-level configuration options for a supervis |Property|Type|Description|Required| |--------|----|-----------|--------| -|`id`|String|The supervisor id. This should be a unique ID that will identify the supervisor.|Yes| +|`id`|String|The supervisor id. This should be a unique ID that will identify the supervisor. If unspecified, defaults to `spec.dataSchema.dataSource`.|No| |`type`|String|The supervisor type. For streaming ingestion, this can be either `kafka`, `kinesis`, or `rabbit`. For automatic compaction, set the type to `autocompact`. |Yes| |`spec`|Object|The container object for the supervisor configuration. For automatic compaction, this is the same as the compaction configuration. |Yes| |`spec.dataSchema`|Object|The schema for the indexing task to use during ingestion. See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for more information.|Yes| @@ -413,9 +413,9 @@ This value is for the ideal situation in which there is at most one set of tasks In some circumstances, it is possible to have multiple sets of tasks publishing simultaneously. This would happen if the time-to-publish (generate segment, push to deep storage, load on Historical) is greater than `taskDuration`. This is a valid and correct scenario but requires additional worker capacity to support. In general, it is a good idea to have `taskDuration` be large enough that the previous set of tasks finishes publishing before the current set begins. -## Multi-Supervisor Support -Druid supports multiple stream supervisors ingesting into the same datasource. This means you can have any number of the configured stream supervisors (Kafka, Kinesis, etc.) ingesting into the same datasource at the same time. -In order to ensure proper synchronization between ingestion tasks with multiple supervisors, it's important to set `useConcurrentLocks=true` in the `context` field of the supervisor spec. +## Multi-Supervisor Support (Experimental) +Druid supports multiple stream supervisors ingesting into the same datasource. This means you can have any number of stream supervisors (Kafka, Kinesis, etc.) ingesting into the same datasource at the same time. +In order to ensure proper synchronization between ingestion tasks with multiple supervisors, it's important to set `useConcurrentLocks=true` in the `context` field of the supervisor spec. Read more [here](concurrent-append-replace.md). ## Learn more diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index b21a0f994c2b..3833c4be6b0b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -31,6 +31,7 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentSchemaMapping; @@ -104,14 +105,9 @@ private SegmentTransactionalAppendAction( } else { this.supervisorId = supervisorId; } - - if ((startMetadata == null && endMetadata != null) - || (startMetadata != null && endMetadata == null)) { - throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null."); - } else if (startMetadata != null && supervisorId == null) { - throw InvalidInput.exception("supervisorId cannot be null if startMetadata and endMetadata are both non-null."); - } this.segmentSchemaMapping = segmentSchemaMapping; + + IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, startMetadata, endMetadata); } @Nullable diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 533e6f4b877a..ab84c38cc6a4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -25,7 +25,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.apache.druid.common.config.Configs; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.task.IndexTaskUtils; @@ -33,6 +32,7 @@ import org.apache.druid.indexing.common.task.TaskLockHelper; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.ISE; import org.apache.druid.segment.SegmentSchemaMapping; @@ -134,12 +134,7 @@ private SegmentTransactionalInsertAction( this.supervisorId = Configs.valueOrDefault(supervisorId, dataSource); this.segmentSchemaMapping = segmentSchemaMapping; - if ((startMetadata == null && endMetadata != null) - || (startMetadata != null && endMetadata == null)) { - throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null."); - } else if (startMetadata != null && supervisorId == null) { - throw InvalidInput.exception("supervisorId cannot be null if startMetadata and endMetadata are both non-null."); - } + IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, startMetadata, endMetadata); } @JsonProperty diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 51f667e0c3ee..4b33cf15eceb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -227,7 +227,7 @@ public Response specGetAll( Optional theSpec = manager.getSupervisorSpec(x); if (theSpec.isPresent()) { theBuilder.withSpec(theSpec.get()) - .withDataSource(theSpec.get().getDataSources().stream().findFirst().orElse(null)); + .withDataSource(theSpec.get().getDataSources().stream().findFirst().orElse(null)); } } if (includeSystem) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 72f3d321c94f..04f44b716e72 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -130,6 +130,10 @@ public DataSchema getDataSchema() return dataSchema; } + /** + * Returns the supervisor ID of the supervisor this task belongs to. + * If null/unspecified, this defaults to the datasource name. + */ @JsonProperty public String getSupervisorId() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index bed663b7a1ea..15ec70df08c2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -318,12 +318,7 @@ private Set computeExclusiveStartPartitionsForSequence( */ public String getSupervisorId() { - @Nullable - final String supervisorId = task.getSupervisorId(); - if (supervisorId != null) { - return supervisorId; - } - return task.getDataSource(); + return task.getSupervisorId(); } @VisibleForTesting diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 191edf4a735c..ba8b53711947 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -857,7 +857,7 @@ public String getType() private final String supervisorId; /** - * Type-verbose id for identifying this supervisor in thread-names, listeners, etc. + * Tag for identifying this supervisor in thread-names, listeners, etc. tag = (type + supervisorId). */ private final String supervisorTag; private final TaskInfoProvider taskInfoProvider; @@ -1264,7 +1264,7 @@ public void tryInit() } catch (Throwable e) { stateManager.recordThrowableEvent(e); - log.makeAlert(e, "SeekableStreamSupervisor[%s] for datasource=[%s] failed to handle notice", supervisorId, dataSource) + log.makeAlert(e, "Supervisor[%s] for datasource[%s] failed to handle notice", supervisorId, dataSource) .addData("noticeClass", notice.getClass().getSimpleName()) .emit(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index f09e3bb9e8c3..967652673f7c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -162,6 +162,10 @@ public ServiceEmitter getEmitter() return emitter; } + /** + * Returns the identifier for this supervisor. + * If unspecified, defaults to the dataSource being written to. + */ @Override @JsonProperty public String getId() diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index f4628707a215..7954cac21a98 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -73,7 +73,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest static final int TOTAL_NUMBER_OF_SECOND = 10; private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class); - // Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created + // Since this integration test can be terminated or be killed un-expectedly, this tag is added to all streams created // to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method) // The value to this tag is a timestamp that can be used by a lambda function to remove unused stream. private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after"; @@ -998,14 +998,12 @@ protected void doTestMultiSupervisorIndexDataStableState( } for (GeneratedTestConfig testConfig : testConfigs) { - ITRetryUtil.retryUntil( - () -> SupervisorStateManager.BasicState.RUNNING.equals( - indexer.getSupervisorStatus(testConfig.getSupervisorId()) - ), - true, + ITRetryUtil.retryUntilEquals( + () -> indexer.getSupervisorStatus(testConfig.getSupervisorId()), + SupervisorStateManager.BasicState.RUNNING, 10_000, 30, - "Waiting for supervisor [" + testConfig.getSupervisorId() + "] to be running" + "State of supervisor[" + testConfig.getSupervisorId() + "]" ); ITRetryUtil.retryUntil( diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 0e818bd7a175..5a9a33e92ca3 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -315,8 +315,8 @@ SegmentIdWithShardSpec allocatePendingSegment( * If segmentsToDrop is not null and not empty, this insertion will be atomic with a insert-and-drop on inserting * {@param segments} and dropping {@param segmentsToDrop}. * - * @param supervisorId supervisorID which is committing the segments. Cannot be null if `startMetadata` - * and endMetadata` are both non-null. + * @param supervisorId supervisorID which is committing the segments. Cannot be null if {@code startMetadata} + * and {@code endMetadata} are both non-null. * @param segments set of segments to add, must all be from the same dataSource * @param startMetadata dataSource metadata pre-insert must match this startMetadata according to * {@link DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will @@ -632,4 +632,21 @@ List getUnusedSegmentIntervals( */ boolean markSegmentAsUsed(SegmentId segmentId); + /** + * Validates the given supervisorId and given metadata to ensure + * that start/end metadata non-null implies supervisor ID is non-null. + */ + static void validateDataSourceMetadata( + @Nullable final String supervisorId, + @Nullable final DataSourceMetadata startMetadata, + @Nullable final DataSourceMetadata endMetadata + ) + { + if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) { + throw new IllegalArgumentException("start/end metadata pair must be either null or non-null"); + } else if (startMetadata != null && supervisorId == null) { + throw new IllegalArgumentException( + "supervisorId cannot be null if startMetadata and endMetadata are both non-null."); + } + } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 2d7d9ced976d..80d4fe8f750f 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -433,14 +433,7 @@ public SegmentPublishResult commitSegmentsAndMetadata( ) { verifySegmentsToCommit(segments); - - if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) { - throw new IllegalArgumentException("start/end metadata pair must be either null or non-null"); - } else if (startMetadata != null && supervisorId == null) { - throw new IllegalArgumentException( - "supervisorId cannot be null if startMetadata and endMetadata are both non-null."); - } - + IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, startMetadata, endMetadata); final String dataSource = segments.iterator().next().getDataSource(); try { @@ -1189,13 +1182,7 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( ) { final String dataSource = verifySegmentsToCommit(appendSegments); - if ((startMetadata == null && endMetadata != null) - || (startMetadata != null && endMetadata == null)) { - throw new IllegalArgumentException("start/end metadata pair must be either null or non-null"); - } else if (startMetadata != null && supervisorId == null) { - throw new IllegalArgumentException( - "supervisorId cannot be null if startMetadata and endMetadata are both non-null."); - } + IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, startMetadata, endMetadata); final List segmentIdsForNewVersions = inReadOnlyDatasourceTransaction( dataSource, @@ -1237,8 +1224,7 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( transaction -> { // Try to update datasource metadata first if (startMetadata != null) { - final SegmentPublishResult metadataUpdateResult - = updateDataSourceMetadataInTransaction( + final SegmentPublishResult metadataUpdateResult = updateDataSourceMetadataInTransaction( transaction, supervisorId, dataSource, @@ -2289,12 +2275,12 @@ protected SegmentPublishResult updateDataSourceMetadataInTransaction( if (publishResult.isSuccess()) { log.info( - "Updated metadata for supervisor[%s] for datasource[%s] from[%s] to[%s].", + "Updated metadata for supervisor[%s], datasource[%s] from[%s] to[%s].", supervisorId, dataSource, oldCommitMetadataFromDb, newCommitMetadata ); } else { log.info( - "Failed to update metadata for supervisor[%s] for datasource[%s] due to reason[%s].", + "Failed to update metadata for supervisor[%s], datasource[%s] due to reason[%s].", supervisorId, dataSource, publishResult.getErrorMsg() ); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 6d6926ffaac7..42b13ab11c89 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -312,10 +312,9 @@ tableName, getPayloadType(), getQuoteString(), getCollation() } /** - * Creates the table for storing datasource metadata for supervisors. - * Due to backwards compatibility reasons, the `dataSource` column will always uniquely identify a supervisor. - * For certain types of supervisors which support N:1 supervisor:datasource relationship, the `dataSource` column will store the supervisor ID. - * Otherwise, it will store the legacy supervisor ID – the `dataSource` itself. + * The {@code dataSource} column stores the supervisor ID. + * It has not been renamed to retain backwards compatibility. + * Supervisors created without an explicit supervisor id default to using the datasource name. */ public void createDataSourceTable(final String tableName) { From 455ae2ed92dd60df6ea71e6db5cbccd316a661e6 Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Tue, 17 Jun 2025 00:26:04 -0700 Subject: [PATCH 2/7] Switch to InvalidInput.exception --- .../indexing/overlord/IndexerMetadataStorageCoordinator.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 5a9a33e92ca3..d9f0e7a62264 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.overlord; +import org.apache.druid.error.InvalidInput; import org.apache.druid.java.util.common.Pair; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.ReplaceTaskLock; @@ -643,9 +644,9 @@ static void validateDataSourceMetadata( ) { if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) { - throw new IllegalArgumentException("start/end metadata pair must be either null or non-null"); + throw InvalidInput.exception("start/end metadata pair must be either null or non-null"); } else if (startMetadata != null && supervisorId == null) { - throw new IllegalArgumentException( + throw InvalidInput.exception( "supervisorId cannot be null if startMetadata and endMetadata are both non-null."); } } From 4cf313ae234b78f6483b92221519900a983e190c Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Tue, 17 Jun 2025 08:11:50 -0700 Subject: [PATCH 3/7] Fix tests --- .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 2 +- .../indexing/kinesis/supervisor/KinesisSupervisorTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index ed7dda03f481..bf07a2dec644 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -4071,7 +4071,7 @@ public void testCheckpointForUnknownTaskGroup() AlertEvent alert = serviceEmitter.getAlerts().get(0); Assert.assertEquals( - "SeekableStreamSupervisor[testDS] for datasource=[testDS] failed to handle notice", + "Supervisor[testDS] for datasource[testDS] failed to handle notice", alert.getDescription() ); Assert.assertEquals( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 0181443c2c77..ed20a3f3ec38 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -3489,7 +3489,7 @@ public void testCheckpointForUnknownTaskGroup() final AlertEvent alert = serviceEmitter.getAlerts().get(0); Assert.assertEquals( - "SeekableStreamSupervisor[testDS] for datasource=[testDS] failed to handle notice", + "Supervisor[testDS] for datasource[testDS] failed to handle notice", alert.getDescription() ); Assert.assertEquals( From 177323c19dd338a7dfdaeeb0e167299edadeea17 Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Tue, 17 Jun 2025 09:24:46 -0700 Subject: [PATCH 4/7] Fix tests --- .../seekablestream/SeekableStreamIndexTaskRunnerTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java index cdfe1fa6f014..7518bb7f172a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java @@ -167,7 +167,7 @@ public void testWithinMinMaxTimeNotPopulated() } @Test - public void testIfSupervisorIdIsNullThenUsesDatasource() + public void testGetSupervisorId() { DimensionsSpec dimensionsSpec = new DimensionsSpec( Arrays.asList( @@ -202,12 +202,10 @@ public void testIfSupervisorIdIsNullThenUsesDatasource() Mockito.when(task.getIOConfig()).thenReturn(ioConfig); Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig); - // Return null supervisorId - Mockito.when(task.getSupervisorId()).thenReturn(null); - Mockito.when(task.getDataSource()).thenReturn("dataSource"); + Mockito.when(task.getSupervisorId()).thenReturn("supervisorId"); TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null, LockGranularity.TIME_CHUNK); - Assert.assertEquals("dataSource", runner.getSupervisorId()); + Assert.assertEquals("supervisorId", runner.getSupervisorId()); } static class TestasbleSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner From 176dd887d0fcbbd6e6c0fd89c0dd5fb77841a23d Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Tue, 17 Jun 2025 10:47:18 -0700 Subject: [PATCH 5/7] Extra changes --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 5 +++-- .../metadata/IndexerSQLMetadataStorageCoordinator.java | 2 ++ .../indexing/overlord/supervisor/SupervisorStatusTest.java | 6 ++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ba8b53711947..aa5d6849ebe7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1029,8 +1029,9 @@ public void start() catch (Exception e) { if (!started) { log.warn( - "First initialization attempt failed for SeekableStreamSupervisor[%s], starting retries...", - supervisorId + "First initialization attempt failed for SeekableStreamSupervisor[%s] for dataSource[%s], starting retries...", + supervisorId, + dataSource ); exec.submit( diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 80d4fe8f750f..5f6f817313eb 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -2144,6 +2144,8 @@ private Map getAppendSegmentsCommittedDuringTask( * {@link DataSourceMetadata#matches matches} the {@code endMetadata}, this * method returns immediately with success. * + * @param supervisorId The supervisor ID. Used as the PK for the corresponding metadata entry in the DB. + * @param dataSource The dataSource. Currently used only for logging purposes. * @param startMetadata Current entry in the DB must * {@link DataSourceMetadata#matches match} this value. * @param endMetadata The updated entry will be equal to the current entry diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java index 4c370453a0bc..f8ef902c2fa8 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java @@ -50,7 +50,8 @@ public void testSerde() throws IOException public void testJsonAttr() throws IOException { String json = "{" - + "\"id\":\"wikipedia\"," + + "\"id\":\"wikipedia_supervisor\"," + + "\"dataSource\":\"wikipedia\"," + "\"state\":\"UNHEALTHY_SUPERVISOR\"," + "\"detailedState\":\"UNHEALTHY_SUPERVISOR\"," + "\"healthy\":false," @@ -61,7 +62,8 @@ public void testJsonAttr() throws IOException final ObjectMapper mapper = new ObjectMapper(); final SupervisorStatus deserialized = mapper.readValue(json, SupervisorStatus.class); Assert.assertNotNull(deserialized); - Assert.assertEquals("wikipedia", deserialized.getId()); + Assert.assertEquals("wikipedia_supervisor", deserialized.getId()); + Assert.assertEquals("wikipedia", deserialized.getDataSource()); final String serialized = mapper.writeValueAsString(deserialized); Assert.assertTrue(serialized.contains("\"source\"")); Assert.assertEquals(json, serialized); From 5f0f9312480ec4b829a55c2c44a6beb54b2763fc Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Tue, 17 Jun 2025 12:02:50 -0700 Subject: [PATCH 6/7] Fix null datasource in sys table --- .../supervisor/SupervisorResource.java | 23 ++++++++----------- .../supervisor/SupervisorResourceTest.java | 14 ++++++----- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 4b33cf15eceb..78923a5a347d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -223,30 +223,27 @@ public Response specGetAll( .withDetailedState(theState.get().toString()) .withHealthy(theState.get().isHealthy()); } - if (includeFull) { - Optional theSpec = manager.getSupervisorSpec(x); - if (theSpec.isPresent()) { - theBuilder.withSpec(theSpec.get()) - .withDataSource(theSpec.get().getDataSources().stream().findFirst().orElse(null)); + Optional theSpec = manager.getSupervisorSpec(x); + if (theSpec.isPresent()) { + theBuilder.withDataSource(theSpec.get().getDataSources().stream().findFirst().orElse(null)); + if (includeFull) { + theBuilder.withSpec(theSpec.get()); } - } - if (includeSystem) { - Optional theSpec = manager.getSupervisorSpec(x); - if (theSpec.isPresent()) { + if (includeSystem) { try { // serializing SupervisorSpec here, so that callers of `druid/indexer/v1/supervisor?system` // which are outside the overlord process can deserialize the response and get a json // payload of SupervisorSpec object when they don't have guice bindings for all the fields // for example, broker does not have bindings for all fields of `KafkaSupervisorSpec` or // `KinesisSupervisorSpec` - theBuilder.withSpecString(objectMapper.writeValueAsString(manager.getSupervisorSpec(x).get())); + theBuilder.withSpecString(objectMapper.writeValueAsString(theSpec.get())); } catch (JsonProcessingException e) { throw new RuntimeException(e); } - theBuilder.withType(manager.getSupervisorSpec(x).get().getType()) - .withSource(manager.getSupervisorSpec(x).get().getSource()) - .withSuspended(manager.getSupervisorSpec(x).get().isSuspended()); + theBuilder.withType(theSpec.get().getType()) + .withSource(theSpec.get().getSource()) + .withSuspended(theSpec.get().isSuspended()); } } return theBuilder.build(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index febc959b0d10..49101b34b1b2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -351,8 +351,8 @@ public void testSpecGetAllFull() Assert.assertTrue( specs.stream() .allMatch(spec -> - ("id1".equals(spec.getId()) && SPEC1.equals(spec.getSpec())) || - ("id2".equals(spec.getId()) && SPEC2.equals(spec.getSpec())) + ("id1".equals(spec.getId()) && spec.getDataSource().equals("datasource1") && SPEC1.equals(spec.getSpec())) || + ("id2".equals(spec.getId()) && spec.getDataSource().equals("datasource2") && SPEC2.equals(spec.getSpec())) ) ); } @@ -398,8 +398,8 @@ public void testSpecGetState() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce(); - EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(1); - EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(1); + EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(2); + EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(2); EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1); EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1); setupMockRequest(); @@ -417,11 +417,13 @@ public void testSpecGetState() if ("id1".equals(id)) { return state1.toString().equals(state.getState()) && state1.toString().equals(state.getDetailedState()) - && (Boolean) state.isHealthy() == state1.isHealthy(); + && (Boolean) state.isHealthy() == state1.isHealthy() + && state.getDataSource().equals("datasource1"); } else if ("id2".equals(id)) { return state2.toString().equals(state.getState()) && state2.toString().equals(state.getDetailedState()) - && (Boolean) state.isHealthy() == state2.isHealthy(); + && (Boolean) state.isHealthy() == state2.isHealthy() + && state.getDataSource().equals("datasource2"); } return false; }) From 59ccf4c6a99e00e7ca58728234d56b95bd273406 Mon Sep 17 00:00:00 2001 From: Jesse Tuglu Date: Tue, 17 Jun 2025 19:04:06 -0700 Subject: [PATCH 7/7] Fix comments --- .../overlord/supervisor/SupervisorResource.java | 13 +++++++------ .../supervisor/SeekableStreamSupervisor.java | 2 +- .../overlord/IndexerMetadataStorageCoordinator.java | 4 ++-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 78923a5a347d..c8b0ecff037c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -225,9 +225,10 @@ public Response specGetAll( } Optional theSpec = manager.getSupervisorSpec(x); if (theSpec.isPresent()) { - theBuilder.withDataSource(theSpec.get().getDataSources().stream().findFirst().orElse(null)); + final SupervisorSpec spec = theSpec.get(); + theBuilder.withDataSource(spec.getDataSources().stream().findFirst().orElse(null)); if (includeFull) { - theBuilder.withSpec(theSpec.get()); + theBuilder.withSpec(spec); } if (includeSystem) { try { @@ -236,14 +237,14 @@ public Response specGetAll( // payload of SupervisorSpec object when they don't have guice bindings for all the fields // for example, broker does not have bindings for all fields of `KafkaSupervisorSpec` or // `KinesisSupervisorSpec` - theBuilder.withSpecString(objectMapper.writeValueAsString(theSpec.get())); + theBuilder.withSpecString(objectMapper.writeValueAsString(spec)); } catch (JsonProcessingException e) { throw new RuntimeException(e); } - theBuilder.withType(theSpec.get().getType()) - .withSource(theSpec.get().getSource()) - .withSuspended(theSpec.get().isSuspended()); + theBuilder.withType(spec.getType()) + .withSource(spec.getSource()) + .withSuspended(spec.isSuspended()); } } return theBuilder.build(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index aa5d6849ebe7..01acc9de3063 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1029,7 +1029,7 @@ public void start() catch (Exception e) { if (!started) { log.warn( - "First initialization attempt failed for SeekableStreamSupervisor[%s] for dataSource[%s], starting retries...", + "First initialization attempt failed for supervisor[%s], dataSource[%s], starting retries...", supervisorId, dataSource ); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index d9f0e7a62264..c577bd3af146 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -644,10 +644,10 @@ static void validateDataSourceMetadata( ) { if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) { - throw InvalidInput.exception("start/end metadata pair must be either null or non-null"); + throw InvalidInput.exception("'startMetadata' and 'endMetadata' must either both be null or both non-null"); } else if (startMetadata != null && supervisorId == null) { throw InvalidInput.exception( - "supervisorId cannot be null if startMetadata and endMetadata are both non-null."); + "'supervisorId' cannot be null if 'startMetadata' and 'endMetadata' are both non-null."); } } }