Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.CriticalAction;
import org.apache.druid.indexing.overlord.PendingSegmentUpgradeRecord;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.ReplaceTaskLock;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;

import java.util.Map;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -92,6 +92,18 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
// Find the active replace locks held only by this task
final Set<ReplaceTaskLock> replaceLocksForTask
= toolbox.getTaskLockbox().findReplaceLocksForTask(task);
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock = supervisorManager == null
? Optional.absent()
: supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
final Set<String> activeRealtimeSequencePrefixes;
if (!activeSupervisorIdWithAppendLock.isPresent()) {
activeRealtimeSequencePrefixes = ImmutableSet.of();
} else {
activeRealtimeSequencePrefixes
= supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());

Check warning

Code scanning / CodeQL

Dereferenced variable may be null

Variable [supervisorManager](1) may be null at this access as suggested by [this](2) null guard.
}


final SegmentPublishResult publishResult;
try {
Expand All @@ -101,7 +113,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator()
.commitReplaceSegments(segments, replaceLocksForTask)
.commitReplaceSegments(segments, replaceLocksForTask, activeRealtimeSequencePrefixes)
)
.onInvalidLocks(
() -> SegmentPublishResult.fail(
Expand All @@ -123,7 +135,12 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
// failure to upgrade pending segments does not affect success of the commit
if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
try {
tryUpgradeOverlappingPendingSegments(task, toolbox);
tryUpgradeOverlappingPendingSegments(
task,
toolbox,
activeSupervisorIdWithAppendLock,
publishResult.getPendingSegmentUpgrades()
);
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
Expand All @@ -136,32 +153,29 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
/**
* Tries to upgrade any pending segments that overlap with the committed segments.
*/
private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox)
private void tryUpgradeOverlappingPendingSegments(
Task task,
TaskActionToolbox toolbox,
Optional<String> activeSupervisorIdWithAppendLock,
List<PendingSegmentUpgradeRecord> upgradedPendingSegments
)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorIdWithAppendLock =
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
if (!activeSupervisorIdWithAppendLock.isPresent()) {
return;
}

final Set<String> activeRealtimeSequencePrefixes
= supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get());
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradedPendingSegments =
toolbox.getIndexerMetadataStorageCoordinator()
.upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes);
log.info(
"Upgraded [%d] pending segments for REPLACE task[%s]: [%s]",
upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments
);

upgradedPendingSegments.forEach(
(oldId, newId) -> toolbox.getSupervisorManager()
.registerNewVersionOfPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
oldId,
newId
)
(upgradeRecord) -> toolbox.getSupervisorManager()
.registerNewVersionOfPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
upgradeRecord.getOldId(),
upgradeRecord.getNewId()
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ public Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments
@Override
public SegmentPublishResult commitReplaceSegments(
Set<DataSegment> replaceSegments,
Set<ReplaceTaskLock> locksHeldByReplaceTask
Set<ReplaceTaskLock> locksHeldByReplaceTask,
Set<String> activeRealtimeSequencePrefixes
)
{
return SegmentPublishResult.ok(commitSegments(replaceSegments));
Expand Down Expand Up @@ -227,15 +228,6 @@ public SegmentIdWithShardSpec allocatePendingSegment(
);
}

@Override
public Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments,
Set<String> activeBaseSequenceNames
)
{
return Collections.emptyMap();
}

@Override
public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,32 +325,14 @@ SegmentPublishResult commitAppendSegmentsAndMetadata(
* in {@link #commitAppendSegments}</li>
* </ul>
*
* @param replaceSegments All segments created by a REPLACE task that
* must be committed in a single transaction.
* @param locksHeldByReplaceTask All active non-revoked REPLACE locks held by the task
* @param replaceSegments All segments created by a REPLACE task that
* must be committed in a single transaction.
* @param locksHeldByReplaceTask All active non-revoked REPLACE locks held by the task
* @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task group
*/
SegmentPublishResult commitReplaceSegments(
Set<DataSegment> replaceSegments,
Set<ReplaceTaskLock> locksHeldByReplaceTask
);

/**
* Creates and inserts new IDs for the pending segments hat overlap with the given
* replace segments being committed. The newly created pending segment IDs:
* <ul>
* <li>Have the same interval and version as that of an overlapping segment
* committed by the REPLACE task.</li>
* <li>Cannot be committed but are only used to serve realtime queries against
* those versions.</li>
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
* @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups
* of the supervisor (if any) for this datasource
* @return Map from originally allocated pending segment to its new upgraded ID.
*/
Map<SegmentIdWithShardSpec, SegmentIdWithShardSpec> upgradePendingSegmentsOverlappingWith(
Set<DataSegment> replaceSegments,
Set<ReplaceTaskLock> locksHeldByReplaceTask,
Set<String> activeRealtimeSequencePrefixes
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;

import java.util.Objects;

public class PendingSegmentUpgradeRecord
{
private final SegmentIdWithShardSpec oldId;
private final SegmentIdWithShardSpec newId;

@JsonCreator
public PendingSegmentUpgradeRecord(
@JsonProperty("oldId") SegmentIdWithShardSpec oldId,
@JsonProperty("newId") SegmentIdWithShardSpec newId
)
{
this.oldId = oldId;
this.newId = newId;
}

@JsonProperty
public SegmentIdWithShardSpec getOldId()
{
return oldId;
}

@JsonProperty
public SegmentIdWithShardSpec getNewId()
{
return newId;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PendingSegmentUpgradeRecord that = (PendingSegmentUpgradeRecord) o;
return Objects.equals(oldId, that.oldId) &&
Objects.equals(newId, that.newId);
}

@Override
public int hashCode()
{
return Objects.hash(oldId, newId);
}

@Override
public String toString()
{
return "PendingSegmentUpgradeRecord{" +
"oldId='" + oldId + '\'' +
", newId='" + newId + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;

import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.Set;

Expand All @@ -43,28 +45,38 @@
public class SegmentPublishResult
{
private final Set<DataSegment> segments;
private final List<PendingSegmentUpgradeRecord> pendingSegmentUpgrades;
private final boolean success;
@Nullable
private final String errorMsg;

public static SegmentPublishResult ok(Set<DataSegment> segments, List<PendingSegmentUpgradeRecord> pendingSegmentUpgrades)
{
return new SegmentPublishResult(segments, pendingSegmentUpgrades, true, null);
}

public static SegmentPublishResult ok(Set<DataSegment> segments)
{
return new SegmentPublishResult(segments, true, null);
return new SegmentPublishResult(segments, ImmutableList.of(), true, null);
}

public static SegmentPublishResult fail(String errorMsg)
{
return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg);
return new SegmentPublishResult(ImmutableSet.of(), ImmutableList.of(), false, errorMsg);
}

@JsonCreator
private SegmentPublishResult(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("pendingSegmentUpgrades") @Nullable List<PendingSegmentUpgradeRecord> pendingSegmentUpgrades,
@JsonProperty("success") boolean success,
@JsonProperty("errorMsg") @Nullable String errorMsg
)
{
this.segments = Preconditions.checkNotNull(segments, "segments");
this.pendingSegmentUpgrades = pendingSegmentUpgrades == null
? ImmutableList.of()
: ImmutableList.copyOf(pendingSegmentUpgrades);
this.success = success;
this.errorMsg = errorMsg;

Expand All @@ -79,6 +91,12 @@ public Set<DataSegment> getSegments()
return segments;
}

@JsonProperty
public List<PendingSegmentUpgradeRecord> getPendingSegmentUpgrades()
{
return pendingSegmentUpgrades;
}

@JsonProperty
public boolean isSuccess()
{
Expand All @@ -104,13 +122,14 @@ public boolean equals(Object o)
SegmentPublishResult that = (SegmentPublishResult) o;
return success == that.success &&
Objects.equals(segments, that.segments) &&
Objects.equals(pendingSegmentUpgrades, that.pendingSegmentUpgrades) &&
Objects.equals(errorMsg, that.errorMsg);
}

@Override
public int hashCode()
{
return Objects.hash(segments, success, errorMsg);
return Objects.hash(segments, pendingSegmentUpgrades, success, errorMsg);
}

@Override
Expand Down
Loading