diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index aaa62db90a7c..4a290112cce1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -27,15 +27,15 @@ import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.PendingSegmentUpgradeRecord; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentUtils; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; -import java.util.Map; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -92,6 +92,18 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // Find the active replace locks held only by this task final Set replaceLocksForTask = toolbox.getTaskLockbox().findReplaceLocksForTask(task); + final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); + final Optional activeSupervisorIdWithAppendLock = supervisorManager == null + ? Optional.absent() + : supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); + final Set activeRealtimeSequencePrefixes; + if (!activeSupervisorIdWithAppendLock.isPresent()) { + activeRealtimeSequencePrefixes = ImmutableSet.of(); + } else { + activeRealtimeSequencePrefixes + = supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get()); + } + final SegmentPublishResult publishResult; try { @@ -101,7 +113,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) CriticalAction.builder() .onValidLocks( () -> toolbox.getIndexerMetadataStorageCoordinator() - .commitReplaceSegments(segments, replaceLocksForTask) + .commitReplaceSegments(segments, replaceLocksForTask, activeRealtimeSequencePrefixes) ) .onInvalidLocks( () -> SegmentPublishResult.fail( @@ -123,7 +135,12 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // failure to upgrade pending segments does not affect success of the commit if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) { try { - tryUpgradeOverlappingPendingSegments(task, toolbox); + tryUpgradeOverlappingPendingSegments( + task, + toolbox, + activeSupervisorIdWithAppendLock, + publishResult.getPendingSegmentUpgrades() + ); } catch (Exception e) { log.error(e, "Error while upgrading pending segments for task[%s]", task.getId()); @@ -136,32 +153,29 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) /** * Tries to upgrade any pending segments that overlap with the committed segments. */ - private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox) + private void tryUpgradeOverlappingPendingSegments( + Task task, + TaskActionToolbox toolbox, + Optional activeSupervisorIdWithAppendLock, + List upgradedPendingSegments + ) { - final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); - final Optional activeSupervisorIdWithAppendLock = - supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); if (!activeSupervisorIdWithAppendLock.isPresent()) { return; } - final Set activeRealtimeSequencePrefixes - = supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get()); - Map upgradedPendingSegments = - toolbox.getIndexerMetadataStorageCoordinator() - .upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes); log.info( "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]", upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments ); upgradedPendingSegments.forEach( - (oldId, newId) -> toolbox.getSupervisorManager() - .registerNewVersionOfPendingSegmentOnSupervisor( - activeSupervisorIdWithAppendLock.get(), - oldId, - newId - ) + (upgradeRecord) -> toolbox.getSupervisorManager() + .registerNewVersionOfPendingSegmentOnSupervisor( + activeSupervisorIdWithAppendLock.get(), + upgradeRecord.getOldId(), + upgradeRecord.getNewId() + ) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index f42a300de5f3..1420c9ac571a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -154,7 +154,8 @@ public Map allocatePendingSegments @Override public SegmentPublishResult commitReplaceSegments( Set replaceSegments, - Set locksHeldByReplaceTask + Set locksHeldByReplaceTask, + Set activeRealtimeSequencePrefixes ) { return SegmentPublishResult.ok(commitSegments(replaceSegments)); @@ -227,15 +228,6 @@ public SegmentIdWithShardSpec allocatePendingSegment( ); } - @Override - public Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments, - Set activeBaseSequenceNames - ) - { - return Collections.emptyMap(); - } - @Override public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 31c975339007..15d822c01a06 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -325,32 +325,14 @@ SegmentPublishResult commitAppendSegmentsAndMetadata( * in {@link #commitAppendSegments} * * - * @param replaceSegments All segments created by a REPLACE task that - * must be committed in a single transaction. - * @param locksHeldByReplaceTask All active non-revoked REPLACE locks held by the task + * @param replaceSegments All segments created by a REPLACE task that + * must be committed in a single transaction. + * @param locksHeldByReplaceTask All active non-revoked REPLACE locks held by the task + * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task group */ SegmentPublishResult commitReplaceSegments( Set replaceSegments, - Set locksHeldByReplaceTask - ); - - /** - * Creates and inserts new IDs for the pending segments hat overlap with the given - * replace segments being committed. The newly created pending segment IDs: - *
    - *
  • Have the same interval and version as that of an overlapping segment - * committed by the REPLACE task.
  • - *
  • Cannot be committed but are only used to serve realtime queries against - * those versions.
  • - *
- * - * @param replaceSegments Segments being committed by a REPLACE task - * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups - * of the supervisor (if any) for this datasource - * @return Map from originally allocated pending segment to its new upgraded ID. - */ - Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments, + Set locksHeldByReplaceTask, Set activeRealtimeSequencePrefixes ); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java b/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java new file mode 100644 index 000000000000..9bdefc54e462 --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; + +import java.util.Objects; + +public class PendingSegmentUpgradeRecord +{ + private final SegmentIdWithShardSpec oldId; + private final SegmentIdWithShardSpec newId; + + @JsonCreator + public PendingSegmentUpgradeRecord( + @JsonProperty("oldId") SegmentIdWithShardSpec oldId, + @JsonProperty("newId") SegmentIdWithShardSpec newId + ) + { + this.oldId = oldId; + this.newId = newId; + } + + @JsonProperty + public SegmentIdWithShardSpec getOldId() + { + return oldId; + } + + @JsonProperty + public SegmentIdWithShardSpec getNewId() + { + return newId; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PendingSegmentUpgradeRecord that = (PendingSegmentUpgradeRecord) o; + return Objects.equals(oldId, that.oldId) && + Objects.equals(newId, that.newId); + } + + @Override + public int hashCode() + { + return Objects.hash(oldId, newId); + } + + @Override + public String toString() + { + return "PendingSegmentUpgradeRecord{" + + "oldId='" + oldId + '\'' + + ", newId='" + newId + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index 620ff8831b09..a24428decf3d 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -22,11 +22,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; +import java.util.List; import java.util.Objects; import java.util.Set; @@ -43,28 +45,38 @@ public class SegmentPublishResult { private final Set segments; + private final List pendingSegmentUpgrades; private final boolean success; @Nullable private final String errorMsg; + public static SegmentPublishResult ok(Set segments, List pendingSegmentUpgrades) + { + return new SegmentPublishResult(segments, pendingSegmentUpgrades, true, null); + } + public static SegmentPublishResult ok(Set segments) { - return new SegmentPublishResult(segments, true, null); + return new SegmentPublishResult(segments, ImmutableList.of(), true, null); } public static SegmentPublishResult fail(String errorMsg) { - return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg); + return new SegmentPublishResult(ImmutableSet.of(), ImmutableList.of(), false, errorMsg); } @JsonCreator private SegmentPublishResult( @JsonProperty("segments") Set segments, + @JsonProperty("pendingSegmentUpgrades") @Nullable List pendingSegmentUpgrades, @JsonProperty("success") boolean success, @JsonProperty("errorMsg") @Nullable String errorMsg ) { this.segments = Preconditions.checkNotNull(segments, "segments"); + this.pendingSegmentUpgrades = pendingSegmentUpgrades == null + ? ImmutableList.of() + : ImmutableList.copyOf(pendingSegmentUpgrades); this.success = success; this.errorMsg = errorMsg; @@ -79,6 +91,12 @@ public Set getSegments() return segments; } + @JsonProperty + public List getPendingSegmentUpgrades() + { + return pendingSegmentUpgrades; + } + @JsonProperty public boolean isSuccess() { @@ -104,13 +122,14 @@ public boolean equals(Object o) SegmentPublishResult that = (SegmentPublishResult) o; return success == that.success && Objects.equals(segments, that.segments) && + Objects.equals(pendingSegmentUpgrades, that.pendingSegmentUpgrades) && Objects.equals(errorMsg, that.errorMsg); } @Override public int hashCode() { - return Objects.hash(segments, success, errorMsg); + return Objects.hash(segments, pendingSegmentUpgrades, success, errorMsg); } @Override diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 3d8939c3e52e..a20fb56273b0 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -35,6 +35,7 @@ import org.apache.commons.lang.StringEscapeUtils; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.PendingSegmentUpgradeRecord; import org.apache.druid.indexing.overlord.SegmentCreateRequest; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; @@ -501,7 +502,8 @@ public SegmentPublishResult inTransaction( @Override public SegmentPublishResult commitReplaceSegments( final Set replaceSegments, - final Set locksHeldByReplaceTask + final Set locksHeldByReplaceTask, + final Set activeRealtimeSequencePrefixes ) { verifySegmentsToCommit(replaceSegments); @@ -514,7 +516,8 @@ public SegmentPublishResult commitReplaceSegments( createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask) ); return SegmentPublishResult.ok( - insertSegments(handle, segmentsToInsert) + insertSegments(handle, segmentsToInsert), + upgradePendingSegmentsOverlappingWith(replaceSegments, activeRealtimeSequencePrefixes) ); }, 3, @@ -705,14 +708,13 @@ public SegmentIdWithShardSpec allocatePendingSegment( ); } - @Override - public Map upgradePendingSegmentsOverlappingWith( + private List upgradePendingSegmentsOverlappingWith( Set replaceSegments, Set activeRealtimeSequencePrefixes ) { if (replaceSegments.isEmpty()) { - return Collections.emptyMap(); + return Collections.emptyList(); } // Any replace interval has exactly one version of segments @@ -741,9 +743,9 @@ public Map upgradePendingSegment * those versions. * * - * @return Map from original pending segment to the new upgraded ID. + * @return List of PendingSegmentUpgradeRecords which map the original pending segment to the new upgraded ID. */ - private Map upgradePendingSegments( + private List upgradePendingSegments( Handle handle, String datasource, Map replaceIntervalToMaxId, @@ -751,7 +753,7 @@ private Map upgradePendingSegmen ) throws IOException { final Map newPendingSegmentVersions = new HashMap<>(); - final Map pendingSegmentToNewId = new HashMap<>(); + final List pendingSegmentToNewId = new ArrayList<>(); for (Map.Entry entry : replaceIntervalToMaxId.entrySet()) { final Interval replaceInterval = entry.getKey(); @@ -788,7 +790,7 @@ private Map upgradePendingSegmen ), newId ); - pendingSegmentToNewId.put(pendingSegmentId, newId); + pendingSegmentToNewId.add(new PendingSegmentUpgradeRecord(pendingSegmentId, newId)); } } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index da7bf4226921..ef3e44ad1ef6 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -689,7 +689,7 @@ public void testCommitReplaceSegments() replacingSegments.add(segment); } - coordinator.commitReplaceSegments(replacingSegments, ImmutableSet.of(replaceLock)); + coordinator.commitReplaceSegments(replacingSegments, ImmutableSet.of(replaceLock), ImmutableSet.of()); Assert.assertEquals( 2L * segmentsAppendedWithReplaceLock.size() + replacingSegments.size(),