Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
4327aad
Facilitate supervisor to update pending segment mapping in tasks
AmatyaAvadhanula Sep 26, 2023
f93fbd3
Add CommitRealtimeSegmentsAndMetadataAction
kfaraz Oct 5, 2023
ec9df24
Merge branch 'master' of github.com:apache/druid into upgrade_pending…
kfaraz Oct 6, 2023
1182c74
Add IndexerMetadataStorageCoordinator.commitAppendSegmentsAndMetadata
kfaraz Oct 6, 2023
9ddfd5e
Remove extra task action
kfaraz Oct 7, 2023
7398c18
Merge remote-tracking branch 'upstream/master' into updateTaskPending…
AmatyaAvadhanula Oct 8, 2023
527bb79
Handle upgraded segment announcing and unannouncing
AmatyaAvadhanula Oct 8, 2023
e19b2ca
Upgrade pending segments
kfaraz Oct 9, 2023
260d7bd
Use correct jdbi query object
kfaraz Oct 9, 2023
b7a7e6f
Remove extra index on pending_segments table
kfaraz Oct 9, 2023
46a1b1e
Fix SegmentTransactionalReplaceAction
kfaraz Oct 9, 2023
3df8128
Use the correct lock type
AmatyaAvadhanula Oct 9, 2023
c14786a
Merge changes to upgrade pending segments
AmatyaAvadhanula Oct 9, 2023
c131323
Remove unused import
AmatyaAvadhanula Oct 9, 2023
a4f605d
Use mutable set
AmatyaAvadhanula Oct 9, 2023
04ba92c
Fix tests and other misc changes
AmatyaAvadhanula Oct 10, 2023
7274912
Add realtime segment update and announcement to replace action
AmatyaAvadhanula Oct 10, 2023
0dbbc1e
Merge remote-tracking branch 'upstream/master' into updateTaskPending…
AmatyaAvadhanula Oct 10, 2023
409167b
Fix compilation
AmatyaAvadhanula Oct 10, 2023
37d7197
Return set of segments to be upgraded rather than the upgraded ones
AmatyaAvadhanula Oct 11, 2023
85fecb2
Do not wait indefinitely for handoff of overshadowed segments
AmatyaAvadhanula Oct 11, 2023
760ac0d
Fix fetching
AmatyaAvadhanula Oct 11, 2023
0735d9c
Add some comments
kfaraz Oct 11, 2023
46540c1
Merge branch 'updateTaskPendingSegmentMapping' of github.com:AmatyaAv…
kfaraz Oct 11, 2023
bb5b25f
Address review feedback
AmatyaAvadhanula Oct 11, 2023
73707a8
Simplify updateSegmentMapping method, remove findAllVersionsOfPending…
kfaraz Oct 11, 2023
aeaaadd
Update pending segment mapping API
kfaraz Oct 12, 2023
e893d1a
Allow intervals with append locks to be chosen by autocompaction with…
AmatyaAvadhanula Oct 12, 2023
538aea3
Merge remote-tracking branch 'origin/updateTaskPendingSegmentMapping'…
AmatyaAvadhanula Oct 12, 2023
eb1825b
Deprecate previous API and clean up
AmatyaAvadhanula Oct 12, 2023
1380b29
Fix compilation
AmatyaAvadhanula Oct 12, 2023
ad084e7
Fix tests
AmatyaAvadhanula Oct 12, 2023
8779371
Renaming
AmatyaAvadhanula Oct 12, 2023
7944425
Minor cleanup
kfaraz Oct 12, 2023
c018bef
Merge branch 'updateTaskPendingSegmentMapping' of github.com:AmatyaAv…
kfaraz Oct 12, 2023
09911e0
Fix path and remove unnecessary v2
AmatyaAvadhanula Oct 12, 2023
eb96ebb
Address feedback
AmatyaAvadhanula Oct 12, 2023
037390a
Clean dangling V2s
AmatyaAvadhanula Oct 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -42,18 +47,40 @@
public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult>
{
private final Set<DataSegment> segments;
@Nullable
private final DataSourceMetadata startMetadata;
@Nullable
private final DataSourceMetadata endMetadata;

public static SegmentTransactionalAppendAction create(Set<DataSegment> segments)
public static SegmentTransactionalAppendAction forSegments(Set<DataSegment> segments)
{
return new SegmentTransactionalAppendAction(segments);
return new SegmentTransactionalAppendAction(segments, null, null);
}

public static SegmentTransactionalAppendAction forSegmentsAndMetadata(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
)
{
return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata);
}

@JsonCreator
private SegmentTransactionalAppendAction(
@JsonProperty("segments") Set<DataSegment> segments
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata
)
{
this.segments = segments;
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;

if ((startMetadata == null && endMetadata != null)
|| (startMetadata != null && endMetadata == null)) {
throw InvalidInput.exception("startMetadata and endMetadata must either be both null or both non-null.");
}
}

@JsonProperty
Expand All @@ -62,6 +89,20 @@ public Set<DataSegment> getSegments()
return segments;
}

@JsonProperty
@Nullable
public DataSourceMetadata getStartMetadata()
{
return startMetadata;
}

@JsonProperty
@Nullable
public DataSourceMetadata getEndMetadata()
{
return endMetadata;
}

@Override
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
Expand All @@ -70,30 +111,48 @@ public TypeReference<SegmentPublishResult> getReturnTypeReference()
};
}

/**
* Performs some sanity checks and publishes the given segments.
*/
@Override
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
{
// Verify that all the locks are of expected type
final List<TaskLock> locks = toolbox.getTaskLockbox().findLocksForTask(task);
for (TaskLock lock : locks) {
if (lock.getType() != TaskLockType.APPEND) {
throw InvalidInput.exception(
"Cannot use action[%s] for task[%s] as it is holding a lock of type[%s] instead of [APPEND].",
"SegmentTransactionalAppendAction", task.getId(), lock.getType()
);
}
}

TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments);

final String datasource = task.getDataSource();
final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock
= TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments);

final CriticalAction.Action<SegmentPublishResult> publishAction;
if (startMetadata == null) {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
);
} else {
publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata(
segments,
segmentToReplaceLock,
startMetadata,
endMetadata
);
}

final SegmentPublishResult retVal;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments(
segments,
segmentToReplaceLock
)
)
.onValidLocks(publishAction)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
"Invalid task locks. Maybe they are revoked by a higher priority task."
Expand All @@ -107,20 +166,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
for (DataSegment segment : retVal.getSegments()) {
IndexTaskUtils.setSegmentDimensions(metricBuilder, segment);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,8 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -222,47 +217,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

// getSegments() should return an empty set if announceHistoricalSegments() failed
for (DataSegment segment : retVal.getSegments()) {
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
metricBuilder.setDimension(
DruidMetrics.PARTITIONING_TYPE,
segment.getShardSpec() == null ? null : segment.getShardSpec().getType()
);
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
// Emit the segment related metadata using the configured emitters.
// There is a possibility that some segments' metadata event might get missed if the
// server crashes after commiting segment but before emitting the event.
this.emitSegmentMetadata(segment, toolbox);
}

IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox);
return retVal;
}

private void emitSegmentMetadata(DataSegment segment, TaskActionToolbox toolbox)
{
SegmentMetadataEvent event = new SegmentMetadataEvent(
segment.getDataSource(),
DateTime.now(DateTimeZone.UTC),
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getVersion(),
segment.getLastCompactionState() != null
);

toolbox.getEmitter().emit(event);
}

private void checkWithSegmentLock()
{
final Map<Interval, List<DataSegment>> oldSegmentsMap = groupSegmentsByIntervalAndSort(segmentsToBeOverwritten);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;

import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

Expand All @@ -42,6 +45,8 @@
*/
public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult>
{
private static final Logger log = new Logger(SegmentTransactionalReplaceAction.class);

/**
* Set of segments to be inserted into metadata storage
*/
Expand Down Expand Up @@ -88,9 +93,9 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
final Set<ReplaceTaskLock> replaceLocksForTask
= toolbox.getTaskLockbox().findReplaceLocksForTask(task);

final SegmentPublishResult retVal;
final SegmentPublishResult publishResult;
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
publishResult = toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
Expand All @@ -111,24 +116,45 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
throw new RuntimeException(e);
}

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1));
IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);

for (DataSegment segment : retVal.getSegments()) {
final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType();
metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType);
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize()));
// Upgrade any overlapping pending segments
// Do not perform upgrade in the same transaction as replace commit so that
// failure to upgrade pending segments does not affect success of the commit
if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
try {
tryUpgradeOverlappingPendingSegments(task, toolbox);
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
}
} else {
toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1));
}

return retVal;
return publishResult;
}

/**
* Tries to upgrade any pending segments that overlap with the committed segments.
*/
private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorId = supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource());
if (!activeSupervisorId.isPresent()) {
return;
}

Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
toolbox.getIndexerMetadataStorageCoordinator().upgradePendingSegmentsOverlappingWith(segments);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
);

upgradedPendingSegments.forEach(
(oldId, newId) -> toolbox.getSupervisorManager()
.registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), oldId, newId)
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public static boolean isLockCoversSegments(
&& timeChunkLock.getDataSource().equals(segment.getDataSource())
&& (timeChunkLock.getVersion().compareTo(segment.getVersion()) >= 0
|| TaskLockType.APPEND.equals(timeChunkLock.getType()));
// APPEND locks always have the version DateTimes.EPOCH (1970-01-01)
// and cover the segments irrespective of the segment version
} else {
final SegmentLock segmentLock = (SegmentLock) lock;
return segmentLock.getInterval().contains(segment.getInterval())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,21 +401,21 @@ private boolean tryLockWithDetermineResult(TaskActionClient client, LockGranular

/**
* Builds a TaskAction to publish segments based on the type of locks that this
* task acquires (determined by context property {@link Tasks#TASK_LOCK_TYPE}).
* task acquires.
*
* @see #determineLockType
*/
protected TaskAction<SegmentPublishResult> buildPublishAction(
Set<DataSegment> segmentsToBeOverwritten,
Set<DataSegment> segmentsToPublish
Set<DataSegment> segmentsToPublish,
TaskLockType lockType
)
{
TaskLockType lockType = TaskLockType.valueOf(
getContextValue(Tasks.TASK_LOCK_TYPE, Tasks.DEFAULT_TASK_LOCK_TYPE.name())
);
switch (lockType) {
case REPLACE:
return SegmentTransactionalReplaceAction.create(segmentsToPublish);
case APPEND:
return SegmentTransactionalAppendAction.create(segmentsToPublish);
return SegmentTransactionalAppendAction.forSegments(segmentsToPublish);
default:
return SegmentTransactionalInsertAction.overwriteAction(segmentsToBeOverwritten, segmentsToPublish);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -696,7 +695,7 @@ private void publishSegments(
);
pendingHandoffs.add(Futures.transformAsync(
publishFuture,
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff,
driver::registerHandoff,
MoreExecutors.directExecutor()
));
}
Expand Down
Loading