From 97a367db88980d54b0926b588ecf8e858dd8dfdf Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 28 Feb 2024 11:20:49 +0530 Subject: [PATCH 1/3] Upgrade pending segments transactionally --- .../SegmentTransactionalReplaceAction.java | 51 +++++++----- ...TestIndexerMetadataStorageCoordinator.java | 12 +-- .../IndexerMetadataStorageCoordinator.java | 28 ++----- .../overlord/PendingSegmentUpgradeRecord.java | 78 +++++++++++++++++++ .../overlord/SegmentPublishResult.java | 25 +++++- .../IndexerSQLMetadataStorageCoordinator.java | 20 ++--- ...exerSQLMetadataStorageCoordinatorTest.java | 2 +- 7 files changed, 151 insertions(+), 65 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index aaa62db90a7c..866d20252890 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -27,15 +27,15 @@ import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; +import org.apache.druid.indexing.overlord.PendingSegmentUpgradeRecord; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentUtils; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; -import java.util.Map; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -92,6 +92,17 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // Find the active replace locks held only by this task final Set replaceLocksForTask = toolbox.getTaskLockbox().findReplaceLocksForTask(task); + final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); + final Optional activeSupervisorIdWithAppendLock = + supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); + final Set activeRealtimeSequencePrefixes; + if (!activeSupervisorIdWithAppendLock.isPresent()) { + activeRealtimeSequencePrefixes = ImmutableSet.of(); + } else { + activeRealtimeSequencePrefixes + = supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get()); + } + final SegmentPublishResult publishResult; try { @@ -101,7 +112,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) CriticalAction.builder() .onValidLocks( () -> toolbox.getIndexerMetadataStorageCoordinator() - .commitReplaceSegments(segments, replaceLocksForTask) + .commitReplaceSegments(segments, replaceLocksForTask, activeRealtimeSequencePrefixes) ) .onInvalidLocks( () -> SegmentPublishResult.fail( @@ -123,7 +134,12 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // failure to upgrade pending segments does not affect success of the commit if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) { try { - tryUpgradeOverlappingPendingSegments(task, toolbox); + tryUpgradeOverlappingPendingSegments( + task, + toolbox, + activeSupervisorIdWithAppendLock, + publishResult.getPendingSegmentUpgrades() + ); } catch (Exception e) { log.error(e, "Error while upgrading pending segments for task[%s]", task.getId()); @@ -136,32 +152,29 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) /** * Tries to upgrade any pending segments that overlap with the committed segments. */ - private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox) + private void tryUpgradeOverlappingPendingSegments( + Task task, + TaskActionToolbox toolbox, + Optional activeSupervisorIdWithAppendLock, + List upgradedPendingSegments + ) { - final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); - final Optional activeSupervisorIdWithAppendLock = - supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); if (!activeSupervisorIdWithAppendLock.isPresent()) { return; } - final Set activeRealtimeSequencePrefixes - = supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get()); - Map upgradedPendingSegments = - toolbox.getIndexerMetadataStorageCoordinator() - .upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes); log.info( "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]", upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments ); upgradedPendingSegments.forEach( - (oldId, newId) -> toolbox.getSupervisorManager() - .registerNewVersionOfPendingSegmentOnSupervisor( - activeSupervisorIdWithAppendLock.get(), - oldId, - newId - ) + (upgradeRecord) -> toolbox.getSupervisorManager() + .registerNewVersionOfPendingSegmentOnSupervisor( + activeSupervisorIdWithAppendLock.get(), + upgradeRecord.getOldId(), + upgradeRecord.getNewId() + ) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index f42a300de5f3..1420c9ac571a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -154,7 +154,8 @@ public Map allocatePendingSegments @Override public SegmentPublishResult commitReplaceSegments( Set replaceSegments, - Set locksHeldByReplaceTask + Set locksHeldByReplaceTask, + Set activeRealtimeSequencePrefixes ) { return SegmentPublishResult.ok(commitSegments(replaceSegments)); @@ -227,15 +228,6 @@ public SegmentIdWithShardSpec allocatePendingSegment( ); } - @Override - public Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments, - Set activeBaseSequenceNames - ) - { - return Collections.emptyMap(); - } - @Override public int deletePendingSegmentsCreatedInInterval(String dataSource, Interval deleteInterval) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 31c975339007..15d822c01a06 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -325,32 +325,14 @@ SegmentPublishResult commitAppendSegmentsAndMetadata( * in {@link #commitAppendSegments} * * - * @param replaceSegments All segments created by a REPLACE task that - * must be committed in a single transaction. - * @param locksHeldByReplaceTask All active non-revoked REPLACE locks held by the task + * @param replaceSegments All segments created by a REPLACE task that + * must be committed in a single transaction. + * @param locksHeldByReplaceTask All active non-revoked REPLACE locks held by the task + * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task group */ SegmentPublishResult commitReplaceSegments( Set replaceSegments, - Set locksHeldByReplaceTask - ); - - /** - * Creates and inserts new IDs for the pending segments hat overlap with the given - * replace segments being committed. The newly created pending segment IDs: - *
    - *
  • Have the same interval and version as that of an overlapping segment - * committed by the REPLACE task.
  • - *
  • Cannot be committed but are only used to serve realtime queries against - * those versions.
  • - *
- * - * @param replaceSegments Segments being committed by a REPLACE task - * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups - * of the supervisor (if any) for this datasource - * @return Map from originally allocated pending segment to its new upgraded ID. - */ - Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments, + Set locksHeldByReplaceTask, Set activeRealtimeSequencePrefixes ); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java b/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java new file mode 100644 index 000000000000..46db8e41e8dc --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; + +import java.util.Objects; + +public class PendingSegmentUpgradeRecord +{ + private final SegmentIdWithShardSpec oldId; + private final SegmentIdWithShardSpec newId; + + public PendingSegmentUpgradeRecord( + SegmentIdWithShardSpec oldId, + SegmentIdWithShardSpec newId + ) + { + this.oldId = oldId; + this.newId = newId; + } + + public SegmentIdWithShardSpec getOldId() + { + return oldId; + } + + public SegmentIdWithShardSpec getNewId() + { + return newId; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PendingSegmentUpgradeRecord that = (PendingSegmentUpgradeRecord) o; + return Objects.equals(oldId, that.oldId) && + Objects.equals(newId, that.newId); + } + + @Override + public int hashCode() + { + return Objects.hash(oldId, newId); + } + + @Override + public String toString() + { + return "PendingSegmentUpgradeRecord{" + + "oldId='" + oldId + '\'' + + ", newId='" + newId + '\'' + + '}'; + } +} diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index 620ff8831b09..a24428decf3d 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -22,11 +22,13 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; +import java.util.List; import java.util.Objects; import java.util.Set; @@ -43,28 +45,38 @@ public class SegmentPublishResult { private final Set segments; + private final List pendingSegmentUpgrades; private final boolean success; @Nullable private final String errorMsg; + public static SegmentPublishResult ok(Set segments, List pendingSegmentUpgrades) + { + return new SegmentPublishResult(segments, pendingSegmentUpgrades, true, null); + } + public static SegmentPublishResult ok(Set segments) { - return new SegmentPublishResult(segments, true, null); + return new SegmentPublishResult(segments, ImmutableList.of(), true, null); } public static SegmentPublishResult fail(String errorMsg) { - return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg); + return new SegmentPublishResult(ImmutableSet.of(), ImmutableList.of(), false, errorMsg); } @JsonCreator private SegmentPublishResult( @JsonProperty("segments") Set segments, + @JsonProperty("pendingSegmentUpgrades") @Nullable List pendingSegmentUpgrades, @JsonProperty("success") boolean success, @JsonProperty("errorMsg") @Nullable String errorMsg ) { this.segments = Preconditions.checkNotNull(segments, "segments"); + this.pendingSegmentUpgrades = pendingSegmentUpgrades == null + ? ImmutableList.of() + : ImmutableList.copyOf(pendingSegmentUpgrades); this.success = success; this.errorMsg = errorMsg; @@ -79,6 +91,12 @@ public Set getSegments() return segments; } + @JsonProperty + public List getPendingSegmentUpgrades() + { + return pendingSegmentUpgrades; + } + @JsonProperty public boolean isSuccess() { @@ -104,13 +122,14 @@ public boolean equals(Object o) SegmentPublishResult that = (SegmentPublishResult) o; return success == that.success && Objects.equals(segments, that.segments) && + Objects.equals(pendingSegmentUpgrades, that.pendingSegmentUpgrades) && Objects.equals(errorMsg, that.errorMsg); } @Override public int hashCode() { - return Objects.hash(segments, success, errorMsg); + return Objects.hash(segments, pendingSegmentUpgrades, success, errorMsg); } @Override diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 3d8939c3e52e..a20fb56273b0 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -35,6 +35,7 @@ import org.apache.commons.lang.StringEscapeUtils; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.PendingSegmentUpgradeRecord; import org.apache.druid.indexing.overlord.SegmentCreateRequest; import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.Segments; @@ -501,7 +502,8 @@ public SegmentPublishResult inTransaction( @Override public SegmentPublishResult commitReplaceSegments( final Set replaceSegments, - final Set locksHeldByReplaceTask + final Set locksHeldByReplaceTask, + final Set activeRealtimeSequencePrefixes ) { verifySegmentsToCommit(replaceSegments); @@ -514,7 +516,8 @@ public SegmentPublishResult commitReplaceSegments( createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask) ); return SegmentPublishResult.ok( - insertSegments(handle, segmentsToInsert) + insertSegments(handle, segmentsToInsert), + upgradePendingSegmentsOverlappingWith(replaceSegments, activeRealtimeSequencePrefixes) ); }, 3, @@ -705,14 +708,13 @@ public SegmentIdWithShardSpec allocatePendingSegment( ); } - @Override - public Map upgradePendingSegmentsOverlappingWith( + private List upgradePendingSegmentsOverlappingWith( Set replaceSegments, Set activeRealtimeSequencePrefixes ) { if (replaceSegments.isEmpty()) { - return Collections.emptyMap(); + return Collections.emptyList(); } // Any replace interval has exactly one version of segments @@ -741,9 +743,9 @@ public Map upgradePendingSegment * those versions. * * - * @return Map from original pending segment to the new upgraded ID. + * @return List of PendingSegmentUpgradeRecords which map the original pending segment to the new upgraded ID. */ - private Map upgradePendingSegments( + private List upgradePendingSegments( Handle handle, String datasource, Map replaceIntervalToMaxId, @@ -751,7 +753,7 @@ private Map upgradePendingSegmen ) throws IOException { final Map newPendingSegmentVersions = new HashMap<>(); - final Map pendingSegmentToNewId = new HashMap<>(); + final List pendingSegmentToNewId = new ArrayList<>(); for (Map.Entry entry : replaceIntervalToMaxId.entrySet()) { final Interval replaceInterval = entry.getKey(); @@ -788,7 +790,7 @@ private Map upgradePendingSegmen ), newId ); - pendingSegmentToNewId.put(pendingSegmentId, newId); + pendingSegmentToNewId.add(new PendingSegmentUpgradeRecord(pendingSegmentId, newId)); } } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index da7bf4226921..ef3e44ad1ef6 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -689,7 +689,7 @@ public void testCommitReplaceSegments() replacingSegments.add(segment); } - coordinator.commitReplaceSegments(replacingSegments, ImmutableSet.of(replaceLock)); + coordinator.commitReplaceSegments(replacingSegments, ImmutableSet.of(replaceLock), ImmutableSet.of()); Assert.assertEquals( 2L * segmentsAppendedWithReplaceLock.size() + replacingSegments.size(), From bc3ac87337d9799ec56ab5a3f6d278c84cd7f951 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 28 Feb 2024 15:57:20 +0530 Subject: [PATCH 2/3] Fix NPE --- .../common/actions/SegmentTransactionalReplaceAction.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 866d20252890..4a290112cce1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -93,8 +93,9 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) final Set replaceLocksForTask = toolbox.getTaskLockbox().findReplaceLocksForTask(task); final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); - final Optional activeSupervisorIdWithAppendLock = - supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); + final Optional activeSupervisorIdWithAppendLock = supervisorManager == null + ? Optional.absent() + : supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); final Set activeRealtimeSequencePrefixes; if (!activeSupervisorIdWithAppendLock.isPresent()) { activeRealtimeSequencePrefixes = ImmutableSet.of(); From 0f1c518c52870b69d5fb4943d0b9c6e4c4504279 Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 29 Feb 2024 12:00:22 +0530 Subject: [PATCH 3/3] Add json annotations --- .../indexing/overlord/PendingSegmentUpgradeRecord.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java b/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java index 46db8e41e8dc..9bdefc54e462 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/PendingSegmentUpgradeRecord.java @@ -19,6 +19,8 @@ package org.apache.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import java.util.Objects; @@ -28,20 +30,23 @@ public class PendingSegmentUpgradeRecord private final SegmentIdWithShardSpec oldId; private final SegmentIdWithShardSpec newId; + @JsonCreator public PendingSegmentUpgradeRecord( - SegmentIdWithShardSpec oldId, - SegmentIdWithShardSpec newId + @JsonProperty("oldId") SegmentIdWithShardSpec oldId, + @JsonProperty("newId") SegmentIdWithShardSpec newId ) { this.oldId = oldId; this.newId = newId; } + @JsonProperty public SegmentIdWithShardSpec getOldId() { return oldId; } + @JsonProperty public SegmentIdWithShardSpec getNewId() { return newId;