diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index aaa62db90a7c..ef94a7f3d028 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -118,18 +118,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox); - // Upgrade any overlapping pending segments - // Do not perform upgrade in the same transaction as replace commit so that - // failure to upgrade pending segments does not affect success of the commit - if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) { - try { - tryUpgradeOverlappingPendingSegments(task, toolbox); - } - catch (Exception e) { - log.error(e, "Error while upgrading pending segments for task[%s]", task.getId()); - } - } - return publishResult; } diff --git a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java index c0560a839859..85c562a55655 100644 --- a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -111,6 +111,9 @@ public static class PruneSpecsHolder private final CompactionState lastCompactionState; private final long size; + @Nullable + private SegmentDescriptor prevSegmentDescriptor; + @VisibleForTesting public DataSegment( SegmentId segmentId, @@ -191,6 +194,37 @@ public DataSegment( ); } + public DataSegment( + String dataSource, + Interval interval, + String version, + // use `Map` *NOT* `LoadSpec` because we want to do lazy materialization to prevent dependency pollution + Map loadSpec, + @Nullable List dimensions, + @Nullable List metrics, + @Nullable ShardSpec shardSpec, + @Nullable CompactionState lastCompactionState, + Integer binaryVersion, + long size, + PruneSpecsHolder pruneSpecsHolder + ) + { + this( + dataSource, + interval, + version, + loadSpec, + dimensions, + metrics, + shardSpec, + lastCompactionState, + binaryVersion, + size, + null, + pruneSpecsHolder + ); + } + @JsonCreator public DataSegment( @JsonProperty("dataSource") String dataSource, @@ -210,6 +244,7 @@ public DataSegment( @JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState, @JsonProperty("binaryVersion") Integer binaryVersion, @JsonProperty("size") long size, + @JsonProperty("prevSegmentDescriptor") @Nullable SegmentDescriptor prevSegmentDescriptor, @JacksonInject PruneSpecsHolder pruneSpecsHolder ) { @@ -227,6 +262,7 @@ public DataSegment( this.binaryVersion = binaryVersion; Preconditions.checkArgument(size >= 0); this.size = size; + this.prevSegmentDescriptor = prevSegmentDescriptor; } @Nullable @@ -347,6 +383,12 @@ public SegmentId getId() return id; } + @JsonProperty + public SegmentDescriptor getPrevSegmentDescriptor() + { + return this.prevSegmentDescriptor; + } + public boolean isTombstone() { return getShardSpec().getType().equals(ShardSpec.Type.TOMBSTONE); @@ -441,6 +483,11 @@ public DataSegment withLastCompactionState(CompactionState compactionState) return builder(this).lastCompactionState(compactionState).build(); } + public void setPrevSegmentDescriptor(SegmentDescriptor descriptor) + { + this.prevSegmentDescriptor = descriptor; + } + @Override public int compareTo(DataSegment dataSegment) { @@ -474,6 +521,7 @@ public String toString() ", shardSpec=" + shardSpec + ", lastCompactionState=" + lastCompactionState + ", size=" + size + + ", prevSegmentDescriptor=" + prevSegmentDescriptor + '}'; } @@ -565,12 +613,6 @@ public Builder shardSpec(ShardSpec shardSpec) return this; } - public Builder lastCompactionState(CompactionState compactionState) - { - this.lastCompactionState = compactionState; - return this; - } - public Builder binaryVersion(Integer binaryVersion) { this.binaryVersion = binaryVersion; @@ -583,6 +625,12 @@ public Builder size(long size) return this; } + public Builder lastCompactionState(CompactionState compactionState) + { + this.lastCompactionState = compactionState; + return this; + } + public DataSegment build() { // Check stuff that goes into the id, at least. diff --git a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java index 7bcb4c2ce038..76f50d187e9f 100644 --- a/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/org/apache/druid/client/CachingClusteredClient.java @@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; @@ -94,6 +95,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -452,6 +454,7 @@ private Set computeSegmentsToQuery( filterFieldsForPruning = null; } + final Set prevSegmentIdDescriptors = new HashSet<>(); // Filter unneeded chunks based on partition dimension for (TimelineObjectHolder holder : serversLookup) { final Set> filteredChunks; @@ -468,6 +471,7 @@ private Set computeSegmentsToQuery( } for (PartitionChunk chunk : filteredChunks) { ServerSelector server = chunk.getObject(); + prevSegmentIdDescriptors.add(server.getSegment().getPrevSegmentDescriptor()); final SegmentDescriptor segment = new SegmentDescriptor( holder.getInterval(), holder.getVersion(), @@ -476,6 +480,30 @@ private Set computeSegmentsToQuery( segments.add(new SegmentServerSelector(server, segment)); } } + if (specificSegments) { + return segments; + } + + List> overshadowedHolders = + ImmutableList.copyOf(((VersionedIntervalTimeline) timeline).findFullyOvershadowed()); + List> filteredHolders = + toolChest.filterSegments(query, overshadowedHolders); + for (TimelineObjectHolder holder : filteredHolders) { + for (PartitionChunk chunk : holder.getObject()) { + ServerSelector server = chunk.getObject(); + if (server.hasRealtime()) { + final SegmentDescriptor segment = new SegmentDescriptor( + holder.getInterval(), + holder.getVersion(), + chunk.getChunkNumber() + ); + if (prevSegmentIdDescriptors.contains(segment)) { + continue; + } + segments.add(new SegmentServerSelector(server, segment)); + } + } + } return segments; } diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index 30b259b66194..07b6a0c238f6 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -119,6 +119,13 @@ public boolean isEmpty() } } + public boolean hasRealtime() + { + synchronized (this) { + return !realtimeServers.isEmpty(); + } + } + public List getCandidates(final int numCandidates) { List candidates; diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 0ef488aed405..a4c5fe4d3d34 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1586,13 +1586,13 @@ private Set createNewIdsForAppendSegmentsWithVersion( // Update the set so that subsequent segment IDs use a higher partition number allAllocatedIds.add(newId); - newSegmentIds.add( - DataSegment.builder(segment) - .interval(newId.getInterval()) - .version(newId.getVersion()) - .shardSpec(newId.getShardSpec()) - .build() - ); + final DataSegment upgradedSegment = DataSegment.builder(segment) + .interval(newId.getInterval()) + .version(newId.getVersion()) + .shardSpec(newId.getShardSpec()) + .build(); + upgradedSegment.setPrevSegmentDescriptor(segment.toDescriptor()); + newSegmentIds.add(upgradedSegment); } return newSegmentIds; @@ -2093,13 +2093,13 @@ private Set createNewIdsOfAppendSegmentsAfterReplace( // Create upgraded segment with the correct interval, version and shard spec String lockVersion = upgradeSegmentToLockVersion.get(oldSegment.getId().toString()); - upgradedSegments.add( - DataSegment.builder(oldSegment) - .interval(newInterval) - .version(lockVersion) - .shardSpec(shardSpec) - .build() - ); + final DataSegment upgradedSegment = DataSegment.builder(oldSegment) + .interval(newInterval) + .version(lockVersion) + .shardSpec(shardSpec) + .build(); + upgradedSegment.setPrevSegmentDescriptor(oldSegment.toDescriptor()); + upgradedSegments.add(upgradedSegment); } return upgradedSegments;