Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
dd63724
Initial commit
AmatyaAvadhanula Mar 6, 2024
743696b
Merge remote-tracking branch 'upstream/master' into pending_segments_…
AmatyaAvadhanula Mar 11, 2024
5f77ea6
Allocate and commit PendingSegment instead of SegmentIdWithShardSpec
AmatyaAvadhanula Mar 12, 2024
21ca620
Merge remote-tracking branch 'upstream/master' into pending_segments_…
AmatyaAvadhanula Mar 15, 2024
d2aa575
Get tests working
AmatyaAvadhanula Mar 17, 2024
04c2593
Get streaming tests working
AmatyaAvadhanula Mar 17, 2024
0386c2a
various minor fixes
AmatyaAvadhanula Mar 18, 2024
cf7815b
Clean up unneeded tests
AmatyaAvadhanula Mar 18, 2024
e7e86b9
Resolve merge conflicts
AmatyaAvadhanula Apr 1, 2024
3805410
Merge remote-tracking branch 'upstream/master' into pending_segments_…
AmatyaAvadhanula Apr 8, 2024
2a32dc2
Address most review comments
AmatyaAvadhanula Apr 9, 2024
9906afc
Rename columns and add new interface
AmatyaAvadhanula Apr 9, 2024
dfd3cad
NoopTask can allocate segments
AmatyaAvadhanula Apr 9, 2024
21b8ee5
IndexTasks can allocate pending segments
AmatyaAvadhanula Apr 10, 2024
8bb0feb
Merge remote-tracking branch 'upstream/master' into pending_segments_…
AmatyaAvadhanula Apr 10, 2024
7ffe0ec
Do not throw UOE
AmatyaAvadhanula Apr 10, 2024
3e17b19
Handle test failures for legacy tasks
AmatyaAvadhanula Apr 10, 2024
422411b
Fix order of cleanup after task removal
AmatyaAvadhanula Apr 12, 2024
f91b1a3
Merge remote-tracking branch 'upstream/master' into pending_segments_…
AmatyaAvadhanula Apr 12, 2024
de6b40a
Fix merge conflicts
AmatyaAvadhanula Apr 12, 2024
4653e60
Fix metadata cleanup after task removal
AmatyaAvadhanula Apr 12, 2024
4581133
Merge remote-tracking branch 'upstream/master' into pending_segments_…
AmatyaAvadhanula Apr 15, 2024
b49b8d8
Address review and remove unneeded methods
AmatyaAvadhanula Apr 16, 2024
277596f
Merge remote-tracking branch 'upstream/master' into pending_segments_…
AmatyaAvadhanula Apr 16, 2024
a21adbc
Review comments
AmatyaAvadhanula Apr 16, 2024
a6f958d
Compactions can allocate segments with segment locking
AmatyaAvadhanula Apr 16, 2024
c31837a
Tests for coverage
kfaraz Apr 16, 2024
477abce
Fix intellij inspection
kfaraz Apr 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -71,7 +72,7 @@
import java.util.Set;

@JsonTypeName(MSQControllerTask.TYPE)
public class MSQControllerTask extends AbstractTask implements ClientTaskQuery
public class MSQControllerTask extends AbstractTask implements ClientTaskQuery, PendingSegmentAllocatingTask
{
public static final String TYPE = "query_controller";
public static final String DUMMY_DATASOURCE_FOR_SELECT = "__query_select";
Expand Down Expand Up @@ -157,6 +158,12 @@ public Set<ResourceAction> getInputSourceResources()
return ImmutableSet.of();
}

@Override
public String getTaskAllocatorId()
{
return getId();
}

@JsonProperty("spec")
public MSQSpec getQuerySpec()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.Worker;
Expand All @@ -45,7 +46,7 @@
import java.util.Set;

@JsonTypeName(MSQWorkerTask.TYPE)
public class MSQWorkerTask extends AbstractTask
public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class need not implement PendingSegmentAllocatingTask as it never actually does any allocation. The allocation is always done by the controller task.

this can be addressed in a follow up PR.

{
public static final String TYPE = "query_worker";

Expand Down Expand Up @@ -125,6 +126,12 @@ public Set<ResourceAction> getInputSourceResources()
return ImmutableSet.of();
}

@Override
public String getTaskAllocatorId()
{
return getControllerTaskId();
}


@Override
public boolean isReady(final TaskActionClient taskActionClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

public class MSQControllerTaskTest
{
MSQSpec MSQ_SPEC = MSQSpec
private final MSQSpec MSQ_SPEC = MSQSpec
.builder()
.destination(new DataSourceMSQDestination(
"target",
Expand All @@ -59,15 +59,33 @@ public class MSQControllerTaskTest
@Test
public void testGetInputSourceResources()
{
MSQControllerTask msqWorkerTask = new MSQControllerTask(
MSQControllerTask controllerTask = new MSQControllerTask(
null,
MSQ_SPEC,
null,
null,
null,
null,
null,
null);
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
null
);
Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty());
}

@Test
public void testGetTaskAllocatorId()
{
final String taskId = "taskId";
MSQControllerTask controllerTask = new MSQControllerTask(
taskId,
MSQ_SPEC,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public class MSQWorkerTaskTest
@Test
public void testEquals()
{
Assert.assertNotEquals(msqWorkerTask, 0);
Assert.assertEquals(msqWorkerTask, msqWorkerTask);
Assert.assertEquals(
msqWorkerTask,
Expand Down Expand Up @@ -110,4 +109,11 @@ public void testGetInputSourceResources()
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}

@Test
public void testGetTaskAllocatorId()
{
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Preconditions;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.LockRequestForNewSegment;
Expand Down Expand Up @@ -210,6 +212,12 @@ public SegmentIdWithShardSpec perform(
final TaskActionToolbox toolbox
)
{
if (!(task instanceof PendingSegmentAllocatingTask)) {
throw DruidException.defensive(
"Task[%s] of type[%s] cannot allocate segments as it does not implement PendingSegmentAllocatingTask.",
task.getId(), task.getType()
);
}
int attempt = 0;
while (true) {
attempt++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
Expand All @@ -41,8 +43,20 @@
import java.util.stream.Collectors;

/**
*
* Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
*
* <pre>
* Pseudo code (for a single interval):
* For an append lock held over an interval:
* transaction {
* commit input segments contained within interval
* if there is an active replace lock over the interval:
* add an entry for the inputSegment corresponding to the replace lock's task in the upgradeSegments table
* fetch pending segments with parent contained within the input segments, and commit them
* }
Comment on lines +51 to +58
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't seem appropriate to have the implementation described as pseudo-code. Someone might as well read the code. It is better to briefly describe the key points of the implementation in a list fashion. (This is not a blocker for the PR).

* </pre>
*/
public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult>
{
Expand Down Expand Up @@ -114,6 +128,13 @@ public TypeReference<SegmentPublishResult> getReturnTypeReference()
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
if (!(task instanceof PendingSegmentAllocatingTask)) {
throw DruidException.defensive(
"Task[%s] of type[%s] cannot append segments as it does not implement PendingSegmentAllocatingTask.",
task.getId(),
task.getType()
);
}
// Verify that all the locks are of expected type
final List<TaskLock> locks = toolbox.getTaskLockbox().findLocksForTask(task);
for (TaskLock lock : locks) {
Expand All @@ -132,17 +153,20 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);

final CriticalAction.Action<SegmentPublishResult> publishAction;
final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId();
if (startMetadata == null) {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this branch at all? Can't we just simply call commitAppendSegmentsAndMetadata() with start and end metadata and remove commitAppendSegments(), if it's not anywhere else, which appears to be the case?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point, the plan was to have one action for just committing segments and another action for committing segments and metadata both. So we decided to keep a method that would just commit segments.

But we eventually decided against having the two actions as it didn't really serve a lot of purpose. So now we could simplify the IndexerMetadataStorageCoordinator interface too.

segmentToReplaceLock
segmentToReplaceLock,
taskAllocatorId
);
} else {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
segments,
segmentToReplaceLock,
startMetadata,
endMetadata
endMetadata,
taskAllocatorId
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,35 @@
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by
* your task for the segment intervals.
*
* <pre>
* Pseudo code (for a single interval)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment regarding pseudo-code.

*- For a replace lock held over an interval:
* transaction {
* commit input segments contained within interval
* upgrade ids in the upgradeSegments table corresponding to this task to the replace lock's version and commit them
* fetch payload, task_allocator_id for pending segments
* upgrade each such pending segment to the replace lock's version with the corresponding root segment
* }
* For every pending segment with version == replace lock version:
* Fetch payload, group_id or the pending segment and relay them to the supervisor
* The supervisor relays the payloads to all the tasks with the corresponding group_id to serve realtime queries
* </pre>
*/
public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult>
{
Expand Down Expand Up @@ -123,7 +140,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
// failure to upgrade pending segments does not affect success of the commit
if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
try {
tryUpgradeOverlappingPendingSegments(task, toolbox);
registerUpgradedPendingSegmentsOnSupervisor(task, toolbox);
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
Expand All @@ -134,34 +151,55 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
}

/**
* Tries to upgrade any pending segments that overlap with the committed segments.
* Registers upgraded pending segments on the active supervisor, if any
*/
private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox)
private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
Comment thread
AmatyaAvadhanula marked this conversation as resolved.
final Optional<String> activeSupervisorIdWithAppendLock =
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());

if (!activeSupervisorIdWithAppendLock.isPresent()) {
return;
}

final Set<String> activeRealtimeSequencePrefixes
= supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
toolbox.getIndexerMetadataStorageCoordinator()
.upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
);
final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments here would be helpful.

.getTaskLockbox()
.getAllReplaceLocksForDatasource(task.getDataSource())
.stream()
.filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
.collect(Collectors.toSet());


upgradedPendingSegments.forEach(
(oldId, newId) -> toolbox.getSupervisorManager()
.registerNewVersionOfPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
oldId,
newId
)
Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
pendingSegments.addAll(
toolbox.getIndexerMetadataStorageCoordinator()
.getPendingSegments(task.getDataSource(), replaceLock.getInterval())
);
}
Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
pendingSegment.getId().asSegmentId().toString(),
pendingSegment.getId()
));
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new HashMap<>();
pendingSegments.forEach(pendingSegment -> {
if (pendingSegment.getUpgradedFromSegmentId() != null
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only look at pending segments that were upgraded by this task rather than all upgraded pending segments?

&& !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the upgradedFromSegmentId ever be equal to the id itself?
A normal/root pending segment (i.e. one created by allocation and not upgrade) would have upgraded_from_segment_id as null, right?

segmentToParent.put(
pendingSegment.getId(),
idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId())
);
}
});

segmentToParent.forEach(
(newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
oldId,
newId
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@
import java.util.concurrent.TimeoutException;

@Deprecated
public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements ChatHandler
public class AppenderatorDriverRealtimeIndexTask extends AbstractTask
Comment thread
kfaraz marked this conversation as resolved.
implements ChatHandler, PendingSegmentAllocatingTask
{
private static final String CTX_KEY_LOOKUP_TIER = "lookupTier";

Expand Down Expand Up @@ -259,6 +260,12 @@ public boolean isReady(TaskActionClient taskActionClient)
return true;
}

@Override
public String getTaskAllocatorId()
{
return getGroupId();
}

@Override
public TaskStatus runTask(final TaskToolbox toolbox)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
* serialization fields of this class must correspond to those of {@link
* ClientCompactionTaskQuery}.
*/
public class CompactionTask extends AbstractBatchIndexTask
public class CompactionTask extends AbstractBatchIndexTask implements PendingSegmentAllocatingTask
{
private static final Logger log = new Logger(CompactionTask.class);
private static final Clock UTC_CLOCK = Clock.systemUTC();
Expand Down Expand Up @@ -400,6 +400,12 @@ public String getType()
return TYPE;
}

@Override
public String getTaskAllocatorId()
{
return getGroupId();
}

@Nonnull
@JsonIgnore
@Override
Expand Down
Loading