Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ The following table outlines the high-level configuration options for a supervis

|Property|Type|Description|Required|
|--------|----|-----------|--------|
|`id`|String|The supervisor id. This should be a unique ID that will identify the supervisor.|Yes|
|`id`|String|The supervisor id. This should be a unique ID that will identify the supervisor. If unspecified, defaults to `spec.dataSchema.dataSource`.|No|
|`type`|String|The supervisor type. For streaming ingestion, this can be either `kafka`, `kinesis`, or `rabbit`. For automatic compaction, set the type to `autocompact`. |Yes|
|`spec`|Object|The container object for the supervisor configuration. For automatic compaction, this is the same as the compaction configuration. |Yes|
|`spec.dataSchema`|Object|The schema for the indexing task to use during ingestion. See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for more information.|Yes|
Expand Down Expand Up @@ -413,9 +413,9 @@ This value is for the ideal situation in which there is at most one set of tasks
In some circumstances, it is possible to have multiple sets of tasks publishing simultaneously. This would happen if the
time-to-publish (generate segment, push to deep storage, load on Historical) is greater than `taskDuration`. This is a valid and correct scenario but requires additional worker capacity to support. In general, it is a good idea to have `taskDuration` be large enough that the previous set of tasks finishes publishing before the current set begins.

## Multi-Supervisor Support
Druid supports multiple stream supervisors ingesting into the same datasource. This means you can have any number of the configured stream supervisors (Kafka, Kinesis, etc.) ingesting into the same datasource at the same time.
In order to ensure proper synchronization between ingestion tasks with multiple supervisors, it's important to set `useConcurrentLocks=true` in the `context` field of the supervisor spec.
## Multi-Supervisor Support (Experimental)
Druid supports multiple stream supervisors ingesting into the same datasource. This means you can have any number of stream supervisors (Kafka, Kinesis, etc.) ingesting into the same datasource at the same time.
In order to ensure proper synchronization between ingestion tasks with multiple supervisors, it's important to set `useConcurrentLocks=true` in the `context` field of the supervisor spec. Read more [here](concurrent-append-replace.md).

## Learn more

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4071,7 +4071,7 @@ public void testCheckpointForUnknownTaskGroup()

AlertEvent alert = serviceEmitter.getAlerts().get(0);
Assert.assertEquals(
"SeekableStreamSupervisor[testDS] for datasource=[testDS] failed to handle notice",
"Supervisor[testDS] for datasource[testDS] failed to handle notice",
alert.getDescription()
);
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3489,7 +3489,7 @@ public void testCheckpointForUnknownTaskGroup()

final AlertEvent alert = serviceEmitter.getAlerts().get(0);
Assert.assertEquals(
"SeekableStreamSupervisor[testDS] for datasource=[testDS] failed to handle notice",
"Supervisor[testDS] for datasource[testDS] failed to handle notice",
alert.getDescription()
);
Assert.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentSchemaMapping;
Expand Down Expand Up @@ -104,14 +105,9 @@ private SegmentTransactionalAppendAction(
} else {
this.supervisorId = supervisorId;
}

if ((startMetadata == null && endMetadata != null)
|| (startMetadata != null && endMetadata == null)) {
throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null.");
} else if (startMetadata != null && supervisorId == null) {
throw InvalidInput.exception("supervisorId cannot be null if startMetadata and endMetadata are both non-null.");
}
this.segmentSchemaMapping = segmentSchemaMapping;

IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, startMetadata, endMetadata);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.config.Configs;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskLockHelper;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.SegmentSchemaMapping;
Expand Down Expand Up @@ -134,12 +134,7 @@ private SegmentTransactionalInsertAction(
this.supervisorId = Configs.valueOrDefault(supervisorId, dataSource);
this.segmentSchemaMapping = segmentSchemaMapping;

if ((startMetadata == null && endMetadata != null)
|| (startMetadata != null && endMetadata == null)) {
throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null.");
} else if (startMetadata != null && supervisorId == null) {
throw InvalidInput.exception("supervisorId cannot be null if startMetadata and endMetadata are both non-null.");
}
IndexerMetadataStorageCoordinator.validateDataSourceMetadata(supervisorId, startMetadata, endMetadata);
}

@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,30 +223,28 @@ public Response specGetAll(
.withDetailedState(theState.get().toString())
.withHealthy(theState.get().isHealthy());
}
if (includeFull) {
Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
if (theSpec.isPresent()) {
theBuilder.withSpec(theSpec.get())
.withDataSource(theSpec.get().getDataSources().stream().findFirst().orElse(null));
Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
Comment thread
jtuglu1 marked this conversation as resolved.
if (theSpec.isPresent()) {
final SupervisorSpec spec = theSpec.get();
theBuilder.withDataSource(spec.getDataSources().stream().findFirst().orElse(null));
if (includeFull) {
theBuilder.withSpec(spec);
}
}
if (includeSystem) {
Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
if (theSpec.isPresent()) {
if (includeSystem) {
try {
// serializing SupervisorSpec here, so that callers of `druid/indexer/v1/supervisor?system`
// which are outside the overlord process can deserialize the response and get a json
// payload of SupervisorSpec object when they don't have guice bindings for all the fields
// for example, broker does not have bindings for all fields of `KafkaSupervisorSpec` or
// `KinesisSupervisorSpec`
theBuilder.withSpecString(objectMapper.writeValueAsString(manager.getSupervisorSpec(x).get()));
theBuilder.withSpecString(objectMapper.writeValueAsString(spec));
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
theBuilder.withType(manager.getSupervisorSpec(x).get().getType())
.withSource(manager.getSupervisorSpec(x).get().getSource())
.withSuspended(manager.getSupervisorSpec(x).get().isSuspended());
theBuilder.withType(spec.getType())
.withSource(spec.getSource())
.withSuspended(spec.isSuspended());
}
}
return theBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ public DataSchema getDataSchema()
return dataSchema;
}

/**
* Returns the supervisor ID of the supervisor this task belongs to.
* If null/unspecified, this defaults to the datasource name.
*/
@JsonProperty
public String getSupervisorId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,7 @@ private Set<PartitionIdType> computeExclusiveStartPartitionsForSequence(
*/
public String getSupervisorId()
{
@Nullable
final String supervisorId = task.getSupervisorId();
if (supervisorId != null) {
return supervisorId;
}
return task.getDataSource();
return task.getSupervisorId();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ public String getType()
private final String supervisorId;

/**
* Type-verbose id for identifying this supervisor in thread-names, listeners, etc.
* Tag for identifying this supervisor in thread-names, listeners, etc. tag = (type + supervisorId).
*/
private final String supervisorTag;
private final TaskInfoProvider taskInfoProvider;
Expand Down Expand Up @@ -1029,8 +1029,9 @@ public void start()
catch (Exception e) {
if (!started) {
log.warn(
"First initialization attempt failed for SeekableStreamSupervisor[%s], starting retries...",
supervisorId
"First initialization attempt failed for supervisor[%s], dataSource[%s], starting retries...",
supervisorId,
dataSource
);

exec.submit(
Expand Down Expand Up @@ -1264,7 +1265,7 @@ public void tryInit()
}
catch (Throwable e) {
stateManager.recordThrowableEvent(e);
log.makeAlert(e, "SeekableStreamSupervisor[%s] for datasource=[%s] failed to handle notice", supervisorId, dataSource)
log.makeAlert(e, "Supervisor[%s] for datasource[%s] failed to handle notice", supervisorId, dataSource)
.addData("noticeClass", notice.getClass().getSimpleName())
.emit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ public ServiceEmitter getEmitter()
return emitter;
}

/**
* Returns the identifier for this supervisor.
* If unspecified, defaults to the dataSource being written to.
*/
@Override
@JsonProperty
public String getId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ public void testSpecGetAllFull()
Assert.assertTrue(
specs.stream()
.allMatch(spec ->
("id1".equals(spec.getId()) && SPEC1.equals(spec.getSpec())) ||
("id2".equals(spec.getId()) && SPEC2.equals(spec.getSpec()))
("id1".equals(spec.getId()) && spec.getDataSource().equals("datasource1") && SPEC1.equals(spec.getSpec())) ||
("id2".equals(spec.getId()) && spec.getDataSource().equals("datasource2") && SPEC2.equals(spec.getSpec()))
)
);
}
Expand Down Expand Up @@ -398,8 +398,8 @@ public void testSpecGetState()

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce();
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(1);
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(1);
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(2);
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(2);
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
setupMockRequest();
Expand All @@ -417,11 +417,13 @@ public void testSpecGetState()
if ("id1".equals(id)) {
return state1.toString().equals(state.getState())
&& state1.toString().equals(state.getDetailedState())
&& (Boolean) state.isHealthy() == state1.isHealthy();
&& (Boolean) state.isHealthy() == state1.isHealthy()
&& state.getDataSource().equals("datasource1");
} else if ("id2".equals(id)) {
return state2.toString().equals(state.getState())
&& state2.toString().equals(state.getDetailedState())
&& (Boolean) state.isHealthy() == state2.isHealthy();
&& (Boolean) state.isHealthy() == state2.isHealthy()
&& state.getDataSource().equals("datasource2");
}
return false;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testWithinMinMaxTimeNotPopulated()
}

@Test
public void testIfSupervisorIdIsNullThenUsesDatasource()
public void testGetSupervisorId()
{
DimensionsSpec dimensionsSpec = new DimensionsSpec(
Arrays.asList(
Expand Down Expand Up @@ -202,12 +202,10 @@ public void testIfSupervisorIdIsNullThenUsesDatasource()
Mockito.when(task.getIOConfig()).thenReturn(ioConfig);
Mockito.when(task.getTuningConfig()).thenReturn(tuningConfig);

// Return null supervisorId
Mockito.when(task.getSupervisorId()).thenReturn(null);
Mockito.when(task.getDataSource()).thenReturn("dataSource");
Mockito.when(task.getSupervisorId()).thenReturn("supervisorId");
TestasbleSeekableStreamIndexTaskRunner runner = new TestasbleSeekableStreamIndexTaskRunner(task, null,
LockGranularity.TIME_CHUNK);
Assert.assertEquals("dataSource", runner.getSupervisorId());
Assert.assertEquals("supervisorId", runner.getSupervisorId());
}

static class TestasbleSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
static final int TOTAL_NUMBER_OF_SECOND = 10;

private static final Logger LOG = new Logger(AbstractStreamIndexingTest.class);
// Since this integration test can terminates or be killed un-expectedly, this tag is added to all streams created
// Since this integration test can be terminated or be killed un-expectedly, this tag is added to all streams created
// to help make stream clean up easier. (Normally, streams should be cleanup automattically by the teardown method)
// The value to this tag is a timestamp that can be used by a lambda function to remove unused stream.
private static final String STREAM_EXPIRE_TAG = "druid-ci-expire-after";
Expand Down Expand Up @@ -998,14 +998,12 @@ protected void doTestMultiSupervisorIndexDataStableState(
}

for (GeneratedTestConfig testConfig : testConfigs) {
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(
indexer.getSupervisorStatus(testConfig.getSupervisorId())
),
true,
ITRetryUtil.retryUntilEquals(
() -> indexer.getSupervisorStatus(testConfig.getSupervisorId()),
SupervisorStateManager.BasicState.RUNNING,
10_000,
30,
"Waiting for supervisor [" + testConfig.getSupervisorId() + "] to be running"
"State of supervisor[" + testConfig.getSupervisorId() + "]"
);

ITRetryUtil.retryUntil(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.indexing.overlord;

import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
Expand Down Expand Up @@ -315,8 +316,8 @@ SegmentIdWithShardSpec allocatePendingSegment(
* If segmentsToDrop is not null and not empty, this insertion will be atomic with a insert-and-drop on inserting
* {@param segments} and dropping {@param segmentsToDrop}.
*
* @param supervisorId supervisorID which is committing the segments. Cannot be null if `startMetadata`
* and endMetadata` are both non-null.
* @param supervisorId supervisorID which is committing the segments. Cannot be null if {@code startMetadata}
* and {@code endMetadata} are both non-null.
* @param segments set of segments to add, must all be from the same dataSource
* @param startMetadata dataSource metadata pre-insert must match this startMetadata according to
* {@link DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will
Expand Down Expand Up @@ -632,4 +633,21 @@ List<Interval> getUnusedSegmentIntervals(
*/
boolean markSegmentAsUsed(SegmentId segmentId);

/**
* Validates the given supervisorId and given metadata to ensure
* that start/end metadata non-null implies supervisor ID is non-null.
*/
static void validateDataSourceMetadata(
@Nullable final String supervisorId,
@Nullable final DataSourceMetadata startMetadata,
@Nullable final DataSourceMetadata endMetadata
)
{
if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) {
throw InvalidInput.exception("'startMetadata' and 'endMetadata' must either both be null or both non-null");
} else if (startMetadata != null && supervisorId == null) {
throw InvalidInput.exception(
"'supervisorId' cannot be null if 'startMetadata' and 'endMetadata' are both non-null.");
}
}
}
Loading