Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
e129773
Check for handoff of upgraded segments
AmatyaAvadhanula Mar 19, 2024
561f5bb
Add test
AmatyaAvadhanula Mar 20, 2024
2b1a54f
Merge remote-tracking branch 'upstream/master' into wait_for_handoff_…
AmatyaAvadhanula Mar 20, 2024
f9a148b
Fix test uncertainty
AmatyaAvadhanula Mar 20, 2024
1183bb8
Merge remote-tracking branch 'upstream/master' into wait_for_handoff_…
AmatyaAvadhanula Mar 27, 2024
20e1672
Merge remote-tracking branch 'upstream/master' into wait_for_handoff_…
AmatyaAvadhanula Apr 17, 2024
21bd593
Merge remote-tracking branch 'upstream/master' into wait_for_handoff_…
AmatyaAvadhanula Apr 22, 2024
a93c6e2
Drop sink only when all associated segments have been abandoned
AmatyaAvadhanula Apr 22, 2024
b18d79c
Merge remote-tracking branch 'upstream/master' into wait_for_handoff_…
AmatyaAvadhanula Apr 22, 2024
4ba335c
Fix test flakyness
AmatyaAvadhanula Apr 22, 2024
84cf8b5
Fix npe and other feedback
AmatyaAvadhanula Apr 22, 2024
0599f3f
Fix compilation
AmatyaAvadhanula Apr 22, 2024
1c4859a
Merge remote-tracking branch 'upstream/master' into wait_for_handoff_…
AmatyaAvadhanula Apr 22, 2024
09b19de
Merge remote-tracking branch 'upstream/master' into wait_for_handoff_…
AmatyaAvadhanula Apr 23, 2024
cca38db
Uannounce all versions at once
AmatyaAvadhanula Apr 24, 2024
0ab2326
Add test and address feedback
AmatyaAvadhanula Apr 24, 2024
7e8be63
Fix compilation
AmatyaAvadhanula Apr 24, 2024
7b212a2
Add test for coverage
AmatyaAvadhanula Apr 24, 2024
295ce55
Merge remote-tracking branch 'upstream/master' into wait_for_handoff_…
AmatyaAvadhanula Apr 24, 2024
5e084ea
Delete pending segments in batches of at most 100
AmatyaAvadhanula Apr 24, 2024
4dc1e0e
Resolve conflicts
AmatyaAvadhanula Apr 25, 2024
2dc3b10
Initialize sets only where needed
kfaraz Apr 25, 2024
c83b7d9
Address feedback
AmatyaAvadhanula Apr 25, 2024
aad03c9
More comments
AmatyaAvadhanula Apr 25, 2024
248d614
Merge remote-tracking branch 'upstream/master' into wait_for_handoff_…
AmatyaAvadhanula Apr 25, 2024
a676b29
Merge branch 'wait_for_handoff_upgraded_segments' of github.com:Amaty…
kfaraz Apr 25, 2024
c72ad00
Synchronize within method
AmatyaAvadhanula Apr 25, 2024
5d911a7
Simplify logic
AmatyaAvadhanula Apr 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,6 @@ public LagStats computeLagStats()
throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
}

@Override
public Set<String> getActiveRealtimeSequencePrefixes()
{
throw new UnsupportedOperationException();
}

@Override
public int getActiveTaskGroupsCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,6 @@ public void testMaterializedViewSupervisorSpecCreated()

Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveTaskGroupsCount());

Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveRealtimeSequencePrefixes());

Callable<Integer> noop = new Callable<Integer>() {
@Override
public Integer call()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.Worker;
Expand All @@ -46,7 +45,7 @@
import java.util.Set;

@JsonTypeName(MSQWorkerTask.TYPE)
public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask
public class MSQWorkerTask extends AbstractTask
Comment thread
kfaraz marked this conversation as resolved.
{
public static final String TYPE = "query_worker";

Expand Down Expand Up @@ -126,13 +125,6 @@ public Set<ResourceAction> getInputSourceResources()
return ImmutableSet.of();
}

@Override
public String getTaskAllocatorId()
{
return getControllerTaskId();
}


@Override
public boolean isReady(final TaskActionClient taskActionClient)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,4 @@ public void testGetInputSourceResources()
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}

@Test
public void testGetTaskAllocatorId()
{
MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount);
Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,10 @@
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -155,7 +152,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
// failure to upgrade pending segments does not affect success of the commit
if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
try {
registerUpgradedPendingSegmentsOnSupervisor(task, toolbox);
registerUpgradedPendingSegmentsOnSupervisor(task, toolbox, publishResult.getUpgradedPendingSegments());
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
Expand All @@ -168,7 +165,11 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
/**
* Registers upgraded pending segments on the active supervisor, if any
*/
private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox)
private void registerUpgradedPendingSegmentsOnSupervisor(
Task task,
TaskActionToolbox toolbox,
List<PendingSegmentRecord> upgradedPendingSegments
)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock =
Expand All @@ -178,42 +179,10 @@ private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionTo
return;
}

final Set<ReplaceTaskLock> replaceLocksForTask = toolbox
.getTaskLockbox()
.getAllReplaceLocksForDatasource(task.getDataSource())
.stream()
.filter(lock -> task.getId().equals(lock.getSupervisorTaskId()))
.collect(Collectors.toSet());


Set<PendingSegmentRecord> pendingSegments = new HashSet<>();
for (ReplaceTaskLock replaceLock : replaceLocksForTask) {
pendingSegments.addAll(
toolbox.getIndexerMetadataStorageCoordinator()
.getPendingSegments(task.getDataSource(), replaceLock.getInterval())
);
}
Map<String, SegmentIdWithShardSpec> idToPendingSegment = new HashMap<>();
pendingSegments.forEach(pendingSegment -> idToPendingSegment.put(
pendingSegment.getId().asSegmentId().toString(),
pendingSegment.getId()
));
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> segmentToParent = new HashMap<>();
pendingSegments.forEach(pendingSegment -> {
if (pendingSegment.getUpgradedFromSegmentId() != null
&& !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) {
segmentToParent.put(
pendingSegment.getId(),
idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId())
);
}
});

segmentToParent.forEach(
(newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor(
upgradedPendingSegments.forEach(
upgradedPendingSegment -> supervisorManager.registerUpgradedPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
oldId,
newId
upgradedPendingSegment
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.druid.indexing.common.task.BatchAppenderators;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask;
import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch;
import org.apache.druid.indexing.common.task.SegmentAllocators;
import org.apache.druid.indexing.common.task.TaskResource;
Expand Down Expand Up @@ -109,7 +108,7 @@
* generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of
* publishing on its own.
*/
public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask
public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler
{
public static final String TYPE = "single_phase_sub_task";
public static final String OLD_TYPE_NAME = "index_sub";
Expand Down Expand Up @@ -240,12 +239,6 @@ public String getSubtaskSpecId()
return subtaskSpecId;
}

@Override
public String getTaskAllocatorId()
{
return getGroupId();
}

@Override
public TaskStatus runTask(final TaskToolbox toolbox) throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1242,7 +1242,7 @@ public void remove(final Task task)
idsInSameGroup.remove(task.getId());
if (idsInSameGroup.isEmpty()) {
final int pendingSegmentsDeleted
= metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId);
= metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId);
log.info(
"Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.",
pendingSegmentsDeleted, taskAllocatorId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -308,16 +308,19 @@ public boolean checkPointDataSourceMetadata(
* allows the supervisor to include the pending segment in queries fired against
* that segment version.
*/
public boolean registerNewVersionOfPendingSegmentOnSupervisor(
public boolean registerUpgradedPendingSegmentOnSupervisor(
String supervisorId,
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newSegmentVersion
PendingSegmentRecord upgradedPendingSegment
)
{
try {
Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null");
Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment cannot be null");
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot be null");
Preconditions.checkNotNull(upgradedPendingSegment, "upgraded pending segment cannot be null");
Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), "taskAllocatorId cannot be null");
Preconditions.checkNotNull(
upgradedPendingSegment.getUpgradedFromSegmentId(),
"upgradedFromSegmentId cannot be null"
);

Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(supervisorId);
Preconditions.checkNotNull(supervisor, "supervisor could not be found");
Expand All @@ -326,12 +329,12 @@ public boolean registerNewVersionOfPendingSegmentOnSupervisor(
}

SeekableStreamSupervisor<?, ?, ?> seekableStreamSupervisor = (SeekableStreamSupervisor<?, ?, ?>) supervisor.lhs;
seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion);
seekableStreamSupervisor.registerNewVersionOfPendingSegment(upgradedPendingSegment);
return true;
}
catch (Exception e) {
log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed",
basePendingSegment.asSegmentId(), newSegmentVersion.getVersion(), supervisorId);
log.error(e, "Failed to upgrade pending segment[%s] to new pending segment[%s] on Supervisor[%s].",
upgradedPendingSegment.getUpgradedFromSegmentId(), upgradedPendingSegment.getId().getVersion(), supervisorId);
}
return false;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.joda.time.DateTime;

import java.util.List;
Expand Down Expand Up @@ -158,15 +158,14 @@ ListenableFuture<Boolean> setEndOffsetsAsync(
* Update the task state to redirect queries for later versions to the root pending segment.
* The task also announces that it is serving the segments belonging to the subsequent versions.
* The update is processed only if the task is serving the original pending segment.
* @param taskId - task id
* @param basePendingSegment - the pending segment that was originally allocated
* @param newVersionOfSegment - the ids belonging to the versions to which the root segment needs to be updated
*
* @param taskId - task id
* @param pendingSegmentRecord - the ids belonging to the versions to which the root segment needs to be updated
* @return true if the update succeeds
*/
ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
String taskId,
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newVersionOfSegment
PendingSegmentRecord pendingSegmentRecord
);

Class<PartitionIdType> getPartitionType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
Expand All @@ -57,7 +58,6 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
Expand Down Expand Up @@ -197,13 +197,12 @@ public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getEndOffsetsA
@Override
public ListenableFuture<Boolean> registerNewVersionOfPendingSegmentAsync(
String taskId,
SegmentIdWithShardSpec basePendingSegment,
SegmentIdWithShardSpec newVersionOfSegment
PendingSegmentRecord pendingSegmentRecord
)
{
final RequestBuilder requestBuilder
= new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion")
.jsonContent(jsonMapper, new PendingSegmentVersions(basePendingSegment, newVersionOfSegment));
.jsonContent(jsonMapper, pendingSegmentRecord);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this API? The task side doesn't seem to need any info other than the base segment id and the upgraded segment id.

Postponing this refactor until later might simplify this PR a bit.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer have the original pending segment's SegmentIdWithShardSpec to continue using this API.


return makeRequest(taskId, requestBuilder)
.handler(IgnoreHttpResponseHandler.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
Expand Down Expand Up @@ -1575,18 +1576,15 @@ public Response setEndOffsetsHTTP(
@Path("/pendingSegmentVersion")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response registerNewVersionOfPendingSegment(
PendingSegmentVersions pendingSegmentVersions,
public Response registerUpgradedPendingSegment(
PendingSegmentRecord upgradedPendingSegment,
// this field is only for internal purposes, shouldn't be usually set by users
@Context final HttpServletRequest req
)
{
authorizationCheck(req, Action.WRITE);
try {
((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment(
pendingSegmentVersions.getBaseSegment(),
pendingSegmentVersions.getNewVersion()
);
((StreamAppenderator) appenderator).registerUpgradedPendingSegment(upgradedPendingSegment);
return Response.ok().build();
}
catch (DruidException e) {
Expand All @@ -1598,8 +1596,8 @@ public Response registerNewVersionOfPendingSegment(
catch (Exception e) {
log.error(
e,
"Could not register new version[%s] of pending segment[%s]",
pendingSegmentVersions.getNewVersion(), pendingSegmentVersions.getBaseSegment()
"Could not register pending segment[%s] upgraded from[%s]",
upgradedPendingSegment.getId().asSegmentId(), upgradedPendingSegment.getUpgradedFromSegmentId()
);
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
Expand Down
Loading