Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -433,33 +433,34 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
/**
* Checks if there is a Task distinct from the given {@code taskId} or its replicas
* that is currently waiting to publish offsets for the given partitions.
*
* @return true only if the given {@param supervisorId} represents a
* {@link SeekableStreamSupervisor} and the supervisor has other tasks that
* are currently publishing offsets to an overlapping set of partitions.
*/
public boolean isAnotherTaskGroupPublishingToPartitions(
String supervisorId,
String taskId,
DataSourceMetadata startMetadata
)
{
try {
InvalidInput.conditionalException(supervisorId != null, "'supervisorId' cannot be null");
if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
throw InvalidInput.exception(
"Start metadata[%s] of type[%s] is not valid streaming metadata",
supervisorId, startMetadata.getClass()
);
}
InvalidInput.conditionalException(supervisorId != null, "'supervisorId' cannot be null");
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);
if (supervisor == null || supervisor.rhs == null) {
throw NotFound.exception("Could not find supervisor[%s]", supervisorId);
}
if (!(supervisor.lhs instanceof SeekableStreamSupervisor<?, ?, ?>)) {
return false;
}

Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);
if (supervisor == null || supervisor.rhs == null) {
throw NotFound.exception("Could not find supervisor[%s]", supervisorId);
}
if (!(supervisor.lhs instanceof SeekableStreamSupervisor<?, ?, ?>)) {
throw InvalidInput.exception(
"Supervisor[%s] of type[%s] is not a streaming supervisor",
supervisorId, supervisor.rhs.getType()
);
}
if (!(startMetadata instanceof SeekableStreamDataSourceMetadata<?, ?>)) {
throw InvalidInput.exception(
"Start metadata[%s] of type[%s] is not valid streaming metadata",
startMetadata, startMetadata == null ? null : startMetadata.getClass()
);
}

try {
final Set<Object> partitionIds = Set.copyOf(
((SeekableStreamDataSourceMetadata<?, ?>) startMetadata)
.getSeekableStreamSequenceNumbers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3580,7 +3580,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
* during a task rollover based on the recommendations from the task auto-scaler.
*/
@VisibleForTesting
void maybeScaleDuringTaskRollover() throws ExecutionException, InterruptedException
void maybeScaleDuringTaskRollover()
{
if (taskAutoScaler == null) {
// Do nothing
Expand All @@ -3594,11 +3594,16 @@ void maybeScaleDuringTaskRollover() throws ExecutionException, InterruptedExcept
supervisorId, currentTaskCount, rolloverTaskCount
);
isScalingTasksOnRollover.set(true);
new DynamicAllocationTasksNotice(
() -> rolloverTaskCount,
() -> isScalingTasksOnRollover.set(false),
emitter
).handle();
try {
new DynamicAllocationTasksNotice(
() -> rolloverTaskCount,
() -> {},
emitter
).handle();
}
finally {
isScalingTasksOnRollover.set(false);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
Expand All @@ -42,6 +43,8 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.List;

public class SegmentTransactionalInsertActionTest
{
@Rule
Expand Down Expand Up @@ -190,6 +193,9 @@ public void test_fail_transactionalUpdateDataSourceMetadata() throws Exception
actionTestKit.getTaskLockbox().add(task);
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);

final NoopSupervisorSpec supervisorSpec = new NoopSupervisorSpec(SUPERVISOR_ID, List.of(DATA_SOURCE));
actionTestKit.getSupervisorManager().createOrUpdateAndStartSupervisor(supervisorSpec);

SegmentPublishResult result = SegmentTransactionalInsertAction.appendAction(
ImmutableSet.of(SEGMENT1),
SUPERVISOR_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataSupervisorManager;
import org.apache.druid.metadata.SegmentsMetadataManagerConfig;
import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.metadata.segment.SqlSegmentMetadataTransactionFactory;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class TaskActionTestKit extends ExternalResource
private TestDerbyConnector testDerbyConnector;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private TaskActionToolbox taskActionToolbox;
private SupervisorManager supervisorManager;
private SegmentMetadataCache segmentMetadataCache;
private BlockingExecutorService metadataCachePollExec;

Expand Down Expand Up @@ -144,6 +146,11 @@ public TaskActionToolbox getTaskActionToolbox()
return taskActionToolbox;
}

public SupervisorManager getSupervisorManager()
{
return supervisorManager;
}

public void syncSegmentMetadataCache()
{
metadataCachePollExec.finishNextPendingTasks(4);
Expand Down Expand Up @@ -200,7 +207,14 @@ public boolean isBatchAllocationReduceMetadataIO()
}
};

SupervisorManager supervisorManager = new SupervisorManager(objectMapper, null);
this.supervisorManager = new SupervisorManager(
objectMapper,
new SQLMetadataSupervisorManager(
objectMapper,
testDerbyConnector,
() -> testDerbyConnector.getMetadataTablesConfig()
)
);
SegmentAllocationQueue segmentAllocationQueue = new SegmentAllocationQueue(
taskLockbox,
taskLockConfig,
Expand Down Expand Up @@ -230,7 +244,9 @@ public boolean isBatchAllocationReduceMetadataIO()
testDerbyConnector.createTaskTables();
testDerbyConnector.createAuditTable();
testDerbyConnector.createIndexingStatesTable();
testDerbyConnector.createSupervisorsTable();

supervisorManager.start();
segmentMetadataCache.start();
segmentMetadataCache.becomeLeader();
syncSegmentMetadataCache();
Expand Down Expand Up @@ -289,6 +305,7 @@ public void after()
taskActionToolbox = null;
segmentMetadataCache.stopBeingLeader();
segmentMetadataCache.stop();
supervisorManager.stop();
useSegmentMetadataCache = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
Expand Down Expand Up @@ -796,6 +798,154 @@ public void testRegisterUpgradedPendingSegmentOnSupervisor()
verifyAll();
}

@Test
public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorIdIsNull()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> manager.isAnotherTaskGroupPublishingToPartitions(null, "task1", null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs("'supervisorId' cannot be null")
);
}

@Test
public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifSupervisorNotFound()
{
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> manager.isAnotherTaskGroupPublishingToPartitions("supervisor1", "task1", null)
),
DruidExceptionMatcher.notFound().expectMessageIs("Could not find supervisor[supervisor1]")
);
}

@Test
public void test_isAnotherTaskGroupPublishingToPartitions_returnsFalse_forNonStreamingSupervisor()
{
final String supervisorId = "supervisor1";
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(
Map.of(supervisorId, new TestSupervisorSpec(supervisorId, supervisor1))
);

supervisor1.start();
EasyMock.expect(supervisor1.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();

manager.start();

Assert.assertFalse(
manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", null)
);
}

@Test
public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsNull()
{
final String supervisorId = "supervisor1";
final SeekableStreamSupervisor<Integer, String, ByteEntity> seekableStreamSupervisor =
EasyMock.createMock(SeekableStreamSupervisor.class);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(
Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor))
);

seekableStreamSupervisor.start();
EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
EasyMock.replay(seekableStreamSupervisor);

manager.start();

MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Start metadata[null] of type[null] is not valid streaming metadata"
)
);
}

@Test
public void test_isAnotherTaskGroupPublishingToPartitions_throwsException_ifMetadataIsInvalid()
{
final String supervisorId = "supervisor1";
final SeekableStreamSupervisor<Integer, String, ByteEntity> seekableStreamSupervisor =
EasyMock.createMock(SeekableStreamSupervisor.class);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(
Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor))
);

seekableStreamSupervisor.start();
EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();
replayAll();
EasyMock.replay(seekableStreamSupervisor);

manager.start();

MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, "task1", new ObjectMetadata("abc"))
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"Start metadata[ObjectMetadata{theObject=abc}] of"
+ " type[class org.apache.druid.indexing.overlord.ObjectMetadata]"
+ " is not valid streaming metadata"
)
);
}

@Test
public void test_isAnotherTaskGroupPublishingToPartitions()
{
final String supervisorId = "supervisor1";
final SeekableStreamSupervisor<Integer, String, ByteEntity> seekableStreamSupervisor =
EasyMock.createMock(SeekableStreamSupervisor.class);
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(
Map.of(supervisorId, new TestSupervisorSpec(supervisorId, seekableStreamSupervisor))
);

seekableStreamSupervisor.start();
EasyMock.expect(seekableStreamSupervisor.createAutoscaler(EasyMock.anyObject())).andReturn(null).anyTimes();

// Expect a readyTaskId for which no other group is currently publishing
final String readyTaskId = "task1";
EasyMock.expect(
seekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions(
EasyMock.eq(readyTaskId),
EasyMock.anyObject()
)
).andReturn(false).atLeastOnce();

// Expect a conflictingTaskId for which another group is currently publishing
final String conflictingTaskId = "task2";
EasyMock.expect(
seekableStreamSupervisor.isAnotherTaskGroupPublishingToPartitions(
EasyMock.eq(conflictingTaskId),
EasyMock.anyObject()
)
).andReturn(true).atLeastOnce();

replayAll();
EasyMock.replay(seekableStreamSupervisor);
manager.start();

final DataSourceMetadata startMetadata = TestSeekableStreamDataSourceMetadata.start(
"topic",
Map.of("0", "10")
);
Assert.assertTrue(
manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, conflictingTaskId, startMetadata)
);
Assert.assertFalse(
manager.isAnotherTaskGroupPublishingToPartitions(supervisorId, readyTaskId, startMetadata)
);
}

private static class TestSupervisorSpec implements SupervisorSpec
{
private final String id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,24 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.DataSourceMetadata;

import java.util.Map;

public class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata<String, String>
{
public static TestSeekableStreamDataSourceMetadata start(String topic, Map<String, String> partitionOffsets)
{
return new TestSeekableStreamDataSourceMetadata(
new SeekableStreamStartSequenceNumbers<>(topic, partitionOffsets, null)
);
}

public static TestSeekableStreamDataSourceMetadata end(String topic, Map<String, String> partitionOffsets)
{
return new TestSeekableStreamDataSourceMetadata(
new SeekableStreamEndSequenceNumbers<>(topic, partitionOffsets)
);
}

@JsonCreator
public TestSeekableStreamDataSourceMetadata(
@JsonProperty("partitions") SeekableStreamSequenceNumbers<String, String> seekableStreamSequenceNumbers)
Expand Down
Loading
Loading