Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)

IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox);

// Upgrade any overlapping pending segments
// Do not perform upgrade in the same transaction as replace commit so that
// failure to upgrade pending segments does not affect success of the commit
if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) {
try {
tryUpgradeOverlappingPendingSegments(task, toolbox);
}
catch (Exception e) {
log.error(e, "Error while upgrading pending segments for task[%s]", task.getId());
}
}

return publishResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public static class PruneSpecsHolder
private final CompactionState lastCompactionState;
private final long size;

@Nullable
private SegmentDescriptor prevSegmentDescriptor;

@VisibleForTesting
public DataSegment(
SegmentId segmentId,
Expand Down Expand Up @@ -191,6 +194,37 @@ public DataSegment(
);
}

public DataSegment(
String dataSource,
Interval interval,
String version,
// use `Map` *NOT* `LoadSpec` because we want to do lazy materialization to prevent dependency pollution
Map<String, Object> loadSpec,
@Nullable List<String> dimensions,
@Nullable List<String> metrics,
@Nullable ShardSpec shardSpec,
@Nullable CompactionState lastCompactionState,
Integer binaryVersion,
long size,
PruneSpecsHolder pruneSpecsHolder
)
{
this(
dataSource,
interval,
version,
loadSpec,
dimensions,
metrics,
shardSpec,
lastCompactionState,
binaryVersion,
size,
null,
pruneSpecsHolder
);
}

@JsonCreator
public DataSegment(
@JsonProperty("dataSource") String dataSource,
Expand All @@ -210,6 +244,7 @@ public DataSegment(
@JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
@JsonProperty("size") long size,
@JsonProperty("prevSegmentDescriptor") @Nullable SegmentDescriptor prevSegmentDescriptor,
@JacksonInject PruneSpecsHolder pruneSpecsHolder
)
{
Expand All @@ -227,6 +262,7 @@ public DataSegment(
this.binaryVersion = binaryVersion;
Preconditions.checkArgument(size >= 0);
this.size = size;
this.prevSegmentDescriptor = prevSegmentDescriptor;
}

@Nullable
Expand Down Expand Up @@ -347,6 +383,12 @@ public SegmentId getId()
return id;
}

@JsonProperty
public SegmentDescriptor getPrevSegmentDescriptor()
{
return this.prevSegmentDescriptor;
}

public boolean isTombstone()
{
return getShardSpec().getType().equals(ShardSpec.Type.TOMBSTONE);
Expand Down Expand Up @@ -441,6 +483,11 @@ public DataSegment withLastCompactionState(CompactionState compactionState)
return builder(this).lastCompactionState(compactionState).build();
}

public void setPrevSegmentDescriptor(SegmentDescriptor descriptor)
{
this.prevSegmentDescriptor = descriptor;
}

@Override
public int compareTo(DataSegment dataSegment)
{
Expand Down Expand Up @@ -474,6 +521,7 @@ public String toString()
", shardSpec=" + shardSpec +
", lastCompactionState=" + lastCompactionState +
", size=" + size +
", prevSegmentDescriptor=" + prevSegmentDescriptor +
'}';
}

Expand Down Expand Up @@ -565,12 +613,6 @@ public Builder shardSpec(ShardSpec shardSpec)
return this;
}

public Builder lastCompactionState(CompactionState compactionState)
{
this.lastCompactionState = compactionState;
return this;
}

public Builder binaryVersion(Integer binaryVersion)
{
this.binaryVersion = binaryVersion;
Expand All @@ -583,6 +625,12 @@ public Builder size(long size)
return this;
}

public Builder lastCompactionState(CompactionState compactionState)
{
this.lastCompactionState = compactionState;
return this;
}

public DataSegment build()
{
// Check stuff that goes into the id, at least.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -94,6 +95,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -452,6 +454,7 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
filterFieldsForPruning = null;
}

final Set<SegmentDescriptor> prevSegmentIdDescriptors = new HashSet<>();
// Filter unneeded chunks based on partition dimension
for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) {
final Set<PartitionChunk<ServerSelector>> filteredChunks;
Expand All @@ -468,6 +471,7 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
}
for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
ServerSelector server = chunk.getObject();
prevSegmentIdDescriptors.add(server.getSegment().getPrevSegmentDescriptor());
final SegmentDescriptor segment = new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
Expand All @@ -476,6 +480,30 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
segments.add(new SegmentServerSelector(server, segment));
}
}
if (specificSegments) {
return segments;
}

List<TimelineObjectHolder<String, ServerSelector>> overshadowedHolders =
ImmutableList.copyOf(((VersionedIntervalTimeline) timeline).findFullyOvershadowed());
List<TimelineObjectHolder<String, ServerSelector>> filteredHolders =
toolChest.filterSegments(query, overshadowedHolders);
for (TimelineObjectHolder<String, ServerSelector> holder : filteredHolders) {
for (PartitionChunk<ServerSelector> chunk : holder.getObject()) {
ServerSelector server = chunk.getObject();
if (server.hasRealtime()) {
final SegmentDescriptor segment = new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
chunk.getChunkNumber()
);
if (prevSegmentIdDescriptors.contains(segment)) {
continue;
}
segments.add(new SegmentServerSelector(server, segment));
}
}
}
return segments;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ public boolean isEmpty()
}
}

public boolean hasRealtime()
{
synchronized (this) {
return !realtimeServers.isEmpty();
}
}

public List<DruidServerMetadata> getCandidates(final int numCandidates)
{
List<DruidServerMetadata> candidates;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1586,13 +1586,13 @@ private Set<DataSegment> createNewIdsForAppendSegmentsWithVersion(

// Update the set so that subsequent segment IDs use a higher partition number
allAllocatedIds.add(newId);
newSegmentIds.add(
DataSegment.builder(segment)
.interval(newId.getInterval())
.version(newId.getVersion())
.shardSpec(newId.getShardSpec())
.build()
);
final DataSegment upgradedSegment = DataSegment.builder(segment)
.interval(newId.getInterval())
.version(newId.getVersion())
.shardSpec(newId.getShardSpec())
.build();
upgradedSegment.setPrevSegmentDescriptor(segment.toDescriptor());
newSegmentIds.add(upgradedSegment);
}

return newSegmentIds;
Expand Down Expand Up @@ -2093,13 +2093,13 @@ private Set<DataSegment> createNewIdsOfAppendSegmentsAfterReplace(

// Create upgraded segment with the correct interval, version and shard spec
String lockVersion = upgradeSegmentToLockVersion.get(oldSegment.getId().toString());
upgradedSegments.add(
DataSegment.builder(oldSegment)
.interval(newInterval)
.version(lockVersion)
.shardSpec(shardSpec)
.build()
);
final DataSegment upgradedSegment = DataSegment.builder(oldSegment)
.interval(newInterval)
.version(lockVersion)
.shardSpec(shardSpec)
.build();
upgradedSegment.setPrevSegmentDescriptor(oldSegment.toDescriptor());
upgradedSegments.add(upgradedSegment);
}

return upgradedSegments;
Expand Down