From e129773ea470fce04b75c0bdcb269c5b517967ef Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 19 Mar 2024 11:56:20 +0530 Subject: [PATCH 01/17] Check for handoff of upgraded segments --- .../appenderator/BaseAppenderatorDriver.java | 12 +++++-- .../SegmentsAndCommitMetadata.java | 34 +++++++++++++++++-- .../StreamAppenderatorDriver.java | 14 +++++--- 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index e27d3ff68b69..6a81103ab854 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -619,6 +619,7 @@ ListenableFuture publishInBackground( try { RetryUtils.retry( () -> { + SegmentsAndCommitMetadata retVal = segmentsAndCommitMetadata; try { final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); final SegmentPublishResult publishResult = publisher.publishSegments( @@ -627,7 +628,7 @@ ListenableFuture publishInBackground( outputSegmentsAnnotateFunction, callerMetadata ); - + Set upgradedSegments; if (publishResult.isSuccess()) { log.info( "Published [%s] segments with commit metadata [%s]", @@ -635,6 +636,13 @@ ListenableFuture publishInBackground( callerMetadata ); log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); + upgradedSegments = new HashSet<>(publishResult.getSegments()); + segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove); + if (!upgradedSegments.isEmpty()) { + log.info("Published [%d] upgraded segments.", upgradedSegments.size()); + log.infoSegments(upgradedSegments, "Upgraded segments"); + retVal = retVal.withUpgradedSegments(upgradedSegments); + } } else { // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active // now after all, for two possible reasons: @@ -697,7 +705,7 @@ ListenableFuture publishInBackground( Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return segmentsAndCommitMetadata; + return retVal; }, e -> (e.getMessage() != null && e.getMessage().contains("Failed to update the metadata Store. The new start metadata is ahead of last commited end state.")), RetryUtils.DEFAULT_MAX_TRIES diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java index 3d30e017ca5c..6853060a2e55 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.appenderator; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -27,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; public class SegmentsAndCommitMetadata { @@ -34,14 +36,34 @@ public class SegmentsAndCommitMetadata private final Object commitMetadata; private final ImmutableList segments; + private final ImmutableSet upgradedSegments; public SegmentsAndCommitMetadata( List segments, - @Nullable Object commitMetadata + Object commitMetadata + ) + { + this(segments, commitMetadata, null); + } + + private SegmentsAndCommitMetadata( + List segments, + @Nullable Object commitMetadata, + @Nullable Set upgradedSegments ) { this.segments = ImmutableList.copyOf(segments); this.commitMetadata = commitMetadata; + this.upgradedSegments = upgradedSegments == null ? null : ImmutableSet.copyOf(upgradedSegments); + } + + public SegmentsAndCommitMetadata withUpgradedSegments(Set upgradedSegments) + { + return new SegmentsAndCommitMetadata( + this.segments, + this.commitMetadata, + upgradedSegments + ); } @Nullable @@ -55,6 +77,12 @@ public List getSegments() return segments; } + @Nullable + public Set getUpgradedSegments() + { + return upgradedSegments; + } + @Override public boolean equals(Object o) { @@ -66,13 +94,14 @@ public boolean equals(Object o) } SegmentsAndCommitMetadata that = (SegmentsAndCommitMetadata) o; return Objects.equals(commitMetadata, that.commitMetadata) && + Objects.equals(upgradedSegments, that.upgradedSegments) && Objects.equals(segments, that.segments); } @Override public int hashCode() { - return Objects.hash(commitMetadata, segments); + return Objects.hash(commitMetadata, segments, upgradedSegments); } @Override @@ -81,6 +110,7 @@ public String toString() return getClass().getSimpleName() + "{" + "commitMetadata=" + commitMetadata + ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + + ", upgradedSegments=" + SegmentUtils.commaSeparatedIdentifiers(upgradedSegments) + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index b2b41bf44f76..20290d1f1160 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -51,10 +51,12 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -322,10 +324,14 @@ public ListenableFuture registerHandoff(SegmentsAndCo return Futures.immediateFuture(null); } else { - final List waitingSegmentIdList = segmentsAndCommitMetadata.getSegments().stream() - .map( - SegmentIdWithShardSpec::fromDataSegment) - .collect(Collectors.toList()); + final Set segmentsToBeHandedOff = new HashSet<>(segmentsAndCommitMetadata.getSegments()); + if (segmentsAndCommitMetadata.getUpgradedSegments() != null) { + segmentsToBeHandedOff.addAll(segmentsAndCommitMetadata.getUpgradedSegments()); + } + final List waitingSegmentIdList = + segmentsToBeHandedOff.stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toList()); final Object metadata = Preconditions.checkNotNull(segmentsAndCommitMetadata.getCommitMetadata(), "commitMetadata"); if (waitingSegmentIdList.isEmpty()) { From 561f5bb16ca5d525d15f66ccdf39511831c60498 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 20 Mar 2024 10:14:59 +0530 Subject: [PATCH 02/17] Add test --- .../appenderator/BaseAppenderatorDriver.java | 10 +-- .../StreamAppenderatorDriver.java | 16 ++-- .../StreamAppenderatorDriverTest.java | 75 +++++++++++++++++++ 3 files changed, 90 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 6a81103ab854..627ed75b3678 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -614,12 +614,12 @@ ListenableFuture publishInBackground( final Object callerMetadata = metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(); + final Set upgradedSegments = new HashSet<>(); return executor.submit( () -> { try { RetryUtils.retry( () -> { - SegmentsAndCommitMetadata retVal = segmentsAndCommitMetadata; try { final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); final SegmentPublishResult publishResult = publisher.publishSegments( @@ -628,7 +628,6 @@ ListenableFuture publishInBackground( outputSegmentsAnnotateFunction, callerMetadata ); - Set upgradedSegments; if (publishResult.isSuccess()) { log.info( "Published [%s] segments with commit metadata [%s]", @@ -636,12 +635,11 @@ ListenableFuture publishInBackground( callerMetadata ); log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); - upgradedSegments = new HashSet<>(publishResult.getSegments()); + upgradedSegments.addAll(publishResult.getSegments()); segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove); if (!upgradedSegments.isEmpty()) { log.info("Published [%d] upgraded segments.", upgradedSegments.size()); log.infoSegments(upgradedSegments, "Upgraded segments"); - retVal = retVal.withUpgradedSegments(upgradedSegments); } } else { // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active @@ -705,7 +703,7 @@ ListenableFuture publishInBackground( Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return retVal; + return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); }, e -> (e.getMessage() != null && e.getMessage().contains("Failed to update the metadata Store. The new start metadata is ahead of last commited end state.")), RetryUtils.DEFAULT_MAX_TRIES @@ -719,7 +717,7 @@ ListenableFuture publishInBackground( Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return segmentsAndCommitMetadata; + return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); } ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 20290d1f1160..c7e251b79e98 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -371,19 +371,25 @@ public void onSuccess(Object result) { if (numRemainingHandoffSegments.decrementAndGet() == 0) { List segments = segmentsAndCommitMetadata.getSegments(); + Set upgradedSegments = segmentsAndCommitMetadata.getUpgradedSegments(); log.info("Successfully handed off [%d] segments.", segments.size()); + if (upgradedSegments != null) { + log.info("Successfully handed off [%d] upgraded segments.", upgradedSegments.size()); + } final long handoffTotalTime = System.currentTimeMillis() - handoffStartTime; metrics.reportMaxSegmentHandoffTime(handoffTotalTime); if (handoffTotalTime > HANDOFF_TIME_THRESHOLD) { log.warn("Slow segment handoff! Time taken for [%d] segments is %d ms", segments.size(), handoffTotalTime); } - resultFuture.set( - new SegmentsAndCommitMetadata( - segments, - ((AppenderatorDriverMetadata) metadata).getCallerMetadata() - ) + SegmentsAndCommitMetadata retVal = new SegmentsAndCommitMetadata( + segments, + ((AppenderatorDriverMetadata) metadata).getCallerMetadata() ); + if (upgradedSegments != null) { + retVal = retVal.withUpgradedSegments(upgradedSegments); + } + resultFuture.set(retVal); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 8f2b77c4fc0f..e82970c1498f 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -43,6 +43,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -58,6 +59,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,6 +74,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport { private static final String DATA_SOURCE = "foo"; private static final String VERSION = "abc123"; + private static final String UPGRADED_VERSION = "xyz456"; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); private static final int MAX_ROWS_IN_MEMORY = 100; private static final int MAX_ROWS_PER_SEGMENT = 3; @@ -246,6 +249,44 @@ public void testHandoffTimeout() throws Exception driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); } + @Test + public void testHandoffUpgradedSegments() + throws IOException, InterruptedException, TimeoutException, ExecutionException + { + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + + Assert.assertNull(driver.startJob(null)); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); + } + + driver.persist(committerSupplier.get()); + + // There is no remaining rows in the driver, and thus the result must be empty + final SegmentsAndCommitMetadata segmentsAndCommitMetadata = driver.publishAndRegisterHandoff( + makeUpgradingPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + + Assert.assertNotNull(segmentsAndCommitMetadata.getUpgradedSegments()); + Assert.assertEquals( + segmentsAndCommitMetadata.getSegments().size(), + segmentsAndCommitMetadata.getUpgradedSegments().size() + ); + + Set expectedHandedOffSegments = new HashSet<>(); + for (DataSegment segment : segmentsAndCommitMetadata.getSegments()) { + expectedHandedOffSegments.add(segment.toDescriptor()); + } + for (DataSegment segment : segmentsAndCommitMetadata.getUpgradedSegments()) { + expectedHandedOffSegments.add(segment.toDescriptor()); + } + Assert.assertEquals(expectedHandedOffSegments, segmentHandoffNotifierFactory.getHandedOffSegmentDescriptors()); + } + @Test public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException { @@ -379,6 +420,29 @@ static TransactionalSegmentPublisher makeOkPublisher() SegmentPublishResult.ok(Collections.emptySet()); } + private TransactionalSegmentPublisher makeUpgradingPublisher() + { + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> { + Set allSegments = new HashSet<>(segmentsToPublish); + int id = 0; + for (DataSegment segment : segmentsToPublish) { + DataSegment upgradedSegment = new DataSegment( + SegmentId.of(DATA_SOURCE, Intervals.ETERNITY, UPGRADED_VERSION, id), + segment.getLoadSpec(), + segment.getDimensions(), + segment.getMetrics(), + new NumberedShardSpec(id, 0), + null, + segment.getBinaryVersion(), + segment.getSize() + ); + id++; + allSegments.add(upgradedSegment); + } + return SegmentPublishResult.ok(allSegments); + }; + } + static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) { return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> { @@ -459,6 +523,7 @@ static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifier { private boolean handoffEnabled = true; private long handoffDelay; + private final Set handedOffSegmentDescriptors = new HashSet<>(); public void disableHandoff() { @@ -470,6 +535,13 @@ public void setHandoffDelay(long delay) handoffDelay = delay; } + public Set getHandedOffSegmentDescriptors() + { + synchronized (handedOffSegmentDescriptors) { + return ImmutableSet.copyOf(handedOffSegmentDescriptors); + } + } + @Override public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) { @@ -494,6 +566,9 @@ public boolean registerSegmentHandoffCallback( } exec.execute(handOffRunnable); + synchronized (handedOffSegmentDescriptors) { + handedOffSegmentDescriptors.add(descriptor); + } } return true; } From f9a148bbd29aac2e774747e9b05f8ccaba1269a4 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 20 Mar 2024 11:58:58 +0530 Subject: [PATCH 03/17] Fix test uncertainty --- .../appenderator/StreamAppenderatorDriverFailTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 7e7ce334cc73..b53c6f67e38d 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -221,10 +221,8 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo Assert.assertNull(driver.startJob(null)); - for (int i = 0; i < ROWS.size(); i++) { - committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); - } + committerSupplier.setMetadata(1); + Assert.assertTrue(driver.add(ROWS.get(0), "dummy", committerSupplier, false, true).isOk()); final SegmentsAndCommitMetadata published = driver.publish( StreamAppenderatorDriverTest.makeOkPublisher(), From a93c6e2ac3a954d98a4899bd29c029580ba2a893 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 22 Apr 2024 14:00:40 +0530 Subject: [PATCH 04/17] Drop sink only when all associated segments have been abandoned --- .../SegmentTransactionalReplaceAction.java | 51 ++------ .../supervisor/SupervisorManager.java | 14 +- .../PendingSegmentVersions.java | 56 -------- .../SeekableStreamIndexTaskClient.java | 11 +- ...eekableStreamIndexTaskClientAsyncImpl.java | 7 +- .../SeekableStreamIndexTaskRunner.java | 10 +- .../supervisor/SeekableStreamSupervisor.java | 21 +-- .../task/concurrent/ActionsTestTask.java | 6 +- ...ncurrentReplaceAndStreamingAppendTest.java | 61 ++------- ...TestIndexerMetadataStorageCoordinator.java | 4 +- .../IndexerMetadataStorageCoordinator.java | 4 +- .../overlord/SegmentPublishResult.java | 32 +++++ .../IndexerSQLMetadataStorageCoordinator.java | 58 +++++++-- .../druid/metadata/PendingSegmentRecord.java | 18 ++- .../metadata/SqlSegmentsMetadataQuery.java | 2 +- .../appenderator/StreamAppenderator.java | 120 +++++++++++------- 16 files changed, 227 insertions(+), 248 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 2f4a580e0464..9d12f57a1cc6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -33,12 +33,9 @@ import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentUtils; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -140,7 +137,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // failure to upgrade pending segments does not affect success of the commit if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) { try { - registerUpgradedPendingSegmentsOnSupervisor(task, toolbox); + registerUpgradedPendingSegmentsOnSupervisor(task, toolbox, publishResult.getUpgradedPendingSegments()); } catch (Exception e) { log.error(e, "Error while upgrading pending segments for task[%s]", task.getId()); @@ -153,7 +150,11 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) /** * Registers upgraded pending segments on the active supervisor, if any */ - private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox) + private void registerUpgradedPendingSegmentsOnSupervisor( + Task task, + TaskActionToolbox toolbox, + List upgradedPendingSegments + ) { final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); final Optional activeSupervisorIdWithAppendLock = @@ -163,42 +164,10 @@ private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionTo return; } - final Set replaceLocksForTask = toolbox - .getTaskLockbox() - .getAllReplaceLocksForDatasource(task.getDataSource()) - .stream() - .filter(lock -> task.getId().equals(lock.getSupervisorTaskId())) - .collect(Collectors.toSet()); - - - Set pendingSegments = new HashSet<>(); - for (ReplaceTaskLock replaceLock : replaceLocksForTask) { - pendingSegments.addAll( - toolbox.getIndexerMetadataStorageCoordinator() - .getPendingSegments(task.getDataSource(), replaceLock.getInterval()) - ); - } - Map idToPendingSegment = new HashMap<>(); - pendingSegments.forEach(pendingSegment -> idToPendingSegment.put( - pendingSegment.getId().asSegmentId().toString(), - pendingSegment.getId() - )); - Map segmentToParent = new HashMap<>(); - pendingSegments.forEach(pendingSegment -> { - if (pendingSegment.getUpgradedFromSegmentId() != null - && !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) { - segmentToParent.put( - pendingSegment.getId(), - idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId()) - ); - } - }); - - segmentToParent.forEach( - (newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( + upgradedPendingSegments.forEach( + upgradedPendingSegment -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( activeSupervisorIdWithAppendLock.get(), - oldId, - newId + upgradedPendingSegment ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index dd57b560660c..c1eba5602409 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -33,9 +33,9 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import javax.annotation.Nullable; @@ -310,14 +310,14 @@ public boolean checkPointDataSourceMetadata( */ public boolean registerNewVersionOfPendingSegmentOnSupervisor( String supervisorId, - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion + PendingSegmentRecord upgradedPendingSegment ) { try { Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null"); - Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment cannot be null"); - Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot be null"); + Preconditions.checkNotNull(upgradedPendingSegment, "upgraded pending segment cannot be null"); + Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), "replica group cannot be null"); + Preconditions.checkNotNull(upgradedPendingSegment.getUpgradedFromSegmentId(), "root id cannot be null"); Pair supervisor = supervisors.get(supervisorId); Preconditions.checkNotNull(supervisor, "supervisor could not be found"); @@ -326,12 +326,12 @@ public boolean registerNewVersionOfPendingSegmentOnSupervisor( } SeekableStreamSupervisor seekableStreamSupervisor = (SeekableStreamSupervisor) supervisor.lhs; - seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion); + seekableStreamSupervisor.registerNewVersionOfPendingSegment(upgradedPendingSegment); return true; } catch (Exception e) { log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed", - basePendingSegment.asSegmentId(), newSegmentVersion.getVersion(), supervisorId); + upgradedPendingSegment.getUpgradedFromSegmentId(), upgradedPendingSegment.getId().getVersion(), supervisorId); } return false; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java deleted file mode 100644 index 146b0afc4b9d..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.seekablestream; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; - -/** - * Contains a new version of an existing base pending segment. Used by realtime - * tasks to serve queries against multiple versions of the same pending segment. - */ -public class PendingSegmentVersions -{ - private final SegmentIdWithShardSpec baseSegment; - private final SegmentIdWithShardSpec newVersion; - - @JsonCreator - public PendingSegmentVersions( - @JsonProperty("baseSegment") SegmentIdWithShardSpec baseSegment, - @JsonProperty("newVersion") SegmentIdWithShardSpec newVersion - ) - { - this.baseSegment = baseSegment; - this.newVersion = newVersion; - } - - @JsonProperty - public SegmentIdWithShardSpec getBaseSegment() - { - return baseSegment; - } - - @JsonProperty - public SegmentIdWithShardSpec getNewVersion() - { - return newVersion; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java index 5e5924249608..7fd282e44ce2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import java.util.List; @@ -158,15 +158,14 @@ ListenableFuture setEndOffsetsAsync( * Update the task state to redirect queries for later versions to the root pending segment. * The task also announces that it is serving the segments belonging to the subsequent versions. * The update is processed only if the task is serving the original pending segment. - * @param taskId - task id - * @param basePendingSegment - the pending segment that was originally allocated - * @param newVersionOfSegment - the ids belonging to the versions to which the root segment needs to be updated + * + * @param taskId - task id + * @param pendingSegmentRecord - the ids belonging to the versions to which the root segment needs to be updated * @return true if the update succeeds */ ListenableFuture registerNewVersionOfPendingSegmentAsync( String taskId, - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newVersionOfSegment + PendingSegmentRecord pendingSegmentRecord ); Class getPartitionType(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java index 40d475909e68..5de1cb50a971 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java @@ -43,6 +43,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.IgnoreHttpResponseHandler; import org.apache.druid.rpc.RequestBuilder; @@ -57,7 +58,6 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -197,13 +197,12 @@ public ListenableFuture> getEndOffsetsA @Override public ListenableFuture registerNewVersionOfPendingSegmentAsync( String taskId, - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newVersionOfSegment + PendingSegmentRecord pendingSegmentRecord ) { final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion") - .jsonContent(jsonMapper, new PendingSegmentVersions(basePendingSegment, newVersionOfSegment)); + .jsonContent(jsonMapper, pendingSegmentRecord); return makeRequest(taskId, requestBuilder) .handler(IgnoreHttpResponseHandler.INSTANCE) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 96e1dd401459..5813598169ca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -78,6 +78,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMeters; @@ -1576,17 +1577,14 @@ public Response setEndOffsetsHTTP( @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public Response registerNewVersionOfPendingSegment( - PendingSegmentVersions pendingSegmentVersions, + PendingSegmentRecord upgradedPendingSegment, // this field is only for internal purposes, shouldn't be usually set by users @Context final HttpServletRequest req ) { authorizationCheck(req, Action.WRITE); try { - ((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment( - pendingSegmentVersions.getBaseSegment(), - pendingSegmentVersions.getNewVersion() - ); + ((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment(upgradedPendingSegment); return Response.ok().build(); } catch (DruidException e) { @@ -1599,7 +1597,7 @@ public Response registerNewVersionOfPendingSegment( log.error( e, "Could not register new version[%s] of pending segment[%s]", - pendingSegmentVersions.getNewVersion(), pendingSegmentVersions.getBaseSegment() + upgradedPendingSegment.getId().getVersion(), upgradedPendingSegment.getUpgradedFromSegmentId() ); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index f15a975694fd..d0495700eb2a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -89,12 +89,12 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import javax.annotation.Nonnull; @@ -1119,19 +1119,22 @@ public Set getActiveRealtimeSequencePrefixes() } public void registerNewVersionOfPendingSegment( - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion + PendingSegmentRecord pendingSegmentRecord ) { for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { - for (String taskId : taskGroup.taskIds()) { - taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion); + if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) { + for (String taskId : taskGroup.taskIds()) { + taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord); + } } } for (List taskGroupList : pendingCompletionTaskGroups.values()) { for (TaskGroup taskGroup : taskGroupList) { - for (String taskId : taskGroup.taskIds()) { - taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion); + if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) { + for (String taskId : taskGroup.taskIds()) { + taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord); + } } } } @@ -3202,9 +3205,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException // If we received invalid endOffset values, we clear the known offset to refetch the last committed offset // from metadata. If any endOffset values are invalid, we treat the entire set as invalid as a safety measure. if (!endOffsetsAreInvalid) { - for (Entry entry : endOffsets.entrySet()) { - partitionOffsets.put(entry.getKey(), entry.getValue()); - } + partitionOffsets.putAll(endOffsets); } else { for (Entry entry : endOffsets.entrySet()) { partitionOffsets.put(entry.getKey(), getNotSetMarker()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java index 230cfa4668c9..38cdf838b419 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java @@ -52,7 +52,7 @@ public class ActionsTestTask extends CommandQueueTask { private final TaskActionClient client; private final AtomicInteger sequenceId = new AtomicInteger(0); - private final Map announcedSegmentsToParentSegments = new HashMap<>(); + private final Map announcedSegmentsToParentSegments = new HashMap<>(); public ActionsTestTask(String datasource, String groupId, TaskActionClientFactory factory) { @@ -82,7 +82,7 @@ public SegmentPublishResult commitReplaceSegments(DataSegment... segments) ); } - public Map getAnnouncedSegmentsToParentSegments() + public Map getAnnouncedSegmentsToParentSegments() { return announcedSegmentsToParentSegments; } @@ -114,7 +114,7 @@ public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Gr TaskLockType.APPEND ) ); - announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId()); + announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId().toString()); return pendingSegment; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java index 50c318683e8f..e4273d4b307e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java @@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -74,7 +75,6 @@ import org.junit.Before; import org.junit.Test; -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -83,7 +83,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -122,10 +121,9 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase private final AtomicInteger groupId = new AtomicInteger(0); private final SupervisorManager supervisorManager = EasyMock.mock(SupervisorManager.class); private Capture supervisorId; - private Capture oldPendingSegment; - private Capture newPendingSegment; + private Capture pendingSegment; private Map>> versionToIntervalToLoadSpecs; - private Map parentSegmentToLoadSpec; + private Map parentSegmentToLoadSpec; @Override @Before @@ -169,12 +167,10 @@ public void setUpIngestionTestBase() throws IOException groupId.set(0); appendTask = createAndStartTask(); supervisorId = Capture.newInstance(CaptureType.ALL); - oldPendingSegment = Capture.newInstance(CaptureType.ALL); - newPendingSegment = Capture.newInstance(CaptureType.ALL); + pendingSegment = Capture.newInstance(CaptureType.ALL); EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( EasyMock.capture(supervisorId), - EasyMock.capture(oldPendingSegment), - EasyMock.capture(newPendingSegment) + EasyMock.capture(pendingSegment) )).andReturn(true).anyTimes(); replaceTask = createAndStartTask(); EasyMock.replay(supervisorManager); @@ -682,20 +678,6 @@ public void testLockAllocateDayReplaceMonthAllocateAppend() verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12); } - - @Nullable - private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) - { - for (DataSegment segment : segments) { - if (version.equals(segment.getVersion()) - && Objects.equals(segment.getLoadSpec(), loadSpec)) { - return segment; - } - } - - return null; - } - private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment) { final SegmentId id = pendingSegment.asSegmentId(); @@ -739,23 +721,6 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment. } } - private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments) - { - try { - final TaskActionClient taskActionClient = taskActionClientFactory.create(task); - Collection allUsedSegments = taskActionClient.submit( - new RetrieveUsedSegmentsAction( - WIKI, - Collections.singletonList(interval) - ) - ); - Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); - } - catch (IOException e) { - throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval); - } - } - private TaskToolboxFactory createToolboxFactory( TaskConfig taskConfig, TaskActionClientFactory taskActionClientFactory @@ -799,11 +764,10 @@ private void commitReplaceSegments(DataSegment... dataSegments) { replaceTask.commitReplaceSegments(dataSegments); for (int i = 0; i < supervisorId.getValues().size(); i++) { - announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), newPendingSegment.getValues().get(i)); + announceUpgradedPendingSegment(pendingSegment.getValues().get(i)); } supervisorId.reset(); - oldPendingSegment.reset(); - newPendingSegment.reset(); + pendingSegment.reset(); replaceTask.finishRunAndGetStatus(); } @@ -812,19 +776,16 @@ private SegmentPublishResult commitAppendSegments(DataSegment... dataSegments) SegmentPublishResult result = appendTask.commitAppendSegments(dataSegments); result.getSegments().forEach(this::unannounceUpgradedPendingSegment); for (DataSegment segment : dataSegments) { - parentSegmentToLoadSpec.put(segment.getId(), Iterables.getOnlyElement(segment.getLoadSpec().values())); + parentSegmentToLoadSpec.put(segment.getId().toString(), Iterables.getOnlyElement(segment.getLoadSpec().values())); } appendTask.finishRunAndGetStatus(); return result; } - private void announceUpgradedPendingSegment( - SegmentIdWithShardSpec oldPendingSegment, - SegmentIdWithShardSpec newPendingSegment - ) + private void announceUpgradedPendingSegment(PendingSegmentRecord pendingSegment) { appendTask.getAnnouncedSegmentsToParentSegments() - .put(newPendingSegment.asSegmentId(), oldPendingSegment.asSegmentId()); + .put(pendingSegment.getId().asSegmentId(), pendingSegment.getUpgradedFromSegmentId()); } private void unannounceUpgradedPendingSegment( @@ -849,7 +810,7 @@ private void verifyVersionIntervalLoadSpecUniqueness() loadSpecs.add(loadSpec); } - for (Map.Entry entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) { + for (Map.Entry entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) { final String version = entry.getKey().getVersion(); final Interval interval = entry.getKey().getInterval(); final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index f57494a1e03b..8bcbc7bbd81c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -244,11 +244,11 @@ public SegmentIdWithShardSpec allocatePendingSegment( } @Override - public Map upgradePendingSegmentsOverlappingWith( + public List upgradePendingSegmentsOverlappingWith( Set replaceSegments ) { - return Collections.emptyMap(); + return Collections.emptyList(); } @Override diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 2390e7b55003..830cfbfb41de 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -378,9 +378,9 @@ SegmentPublishResult commitReplaceSegments( * * * @param replaceSegments Segments being committed by a REPLACE task - * @return Map from originally allocated pending segment to its new upgraded ID. + * @return List of inserted pending segment records */ - Map upgradePendingSegmentsOverlappingWith( + List upgradePendingSegmentsOverlappingWith( Set replaceSegments ); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index 620ff8831b09..c4cecd9620cc 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -20,13 +20,17 @@ package org.apache.druid.indexing.overlord; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; +import java.util.List; import java.util.Objects; import java.util.Set; @@ -46,12 +50,19 @@ public class SegmentPublishResult private final boolean success; @Nullable private final String errorMsg; + @Nullable + private final List upgradedPendingSegments; public static SegmentPublishResult ok(Set segments) { return new SegmentPublishResult(segments, true, null); } + public static SegmentPublishResult ok(Set segments, List upgradedPendingSegments) + { + return new SegmentPublishResult(segments, true, null, upgradedPendingSegments); + } + public static SegmentPublishResult fail(String errorMsg) { return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg); @@ -63,13 +74,28 @@ private SegmentPublishResult( @JsonProperty("success") boolean success, @JsonProperty("errorMsg") @Nullable String errorMsg ) + { + this(segments, success, errorMsg, null); + } + + private SegmentPublishResult( + Set segments, + boolean success, + @Nullable String errorMsg, + List upgradedPendingSegments + ) { this.segments = Preconditions.checkNotNull(segments, "segments"); this.success = success; this.errorMsg = errorMsg; + this.upgradedPendingSegments = upgradedPendingSegments; if (!success) { Preconditions.checkArgument(segments.isEmpty(), "segments must be empty for unsuccessful publishes"); + Preconditions.checkArgument( + CollectionUtils.isNullOrEmpty(upgradedPendingSegments), + "upgraded pending segments must be null or empty for unsuccessful publishes" + ); } } @@ -92,6 +118,12 @@ public String getErrorMsg() return errorMsg; } + @JsonIgnore + public List getUpgradedPendingSegments() + { + return upgradedPendingSegments; + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index e7567ba27284..32c26cb2808e 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -73,6 +73,7 @@ import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; +import org.skife.jdbi.v2.Update; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -477,9 +478,9 @@ public SegmentPublishResult commitReplaceSegments( createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask) ); SegmentPublishResult result = SegmentPublishResult.ok( - insertSegments(handle, segmentsToInsert) + insertSegments(handle, segmentsToInsert), + upgradePendingSegmentsOverlappingWith(segmentsToInsert) ); - upgradePendingSegmentsOverlappingWith(segmentsToInsert); return result; }, 3, @@ -678,12 +679,12 @@ public SegmentIdWithShardSpec allocatePendingSegment( } @Override - public Map upgradePendingSegmentsOverlappingWith( + public List upgradePendingSegmentsOverlappingWith( Set replaceSegments ) { if (replaceSegments.isEmpty()) { - return Collections.emptyMap(); + return Collections.emptyList(); } // Any replace interval has exactly one version of segments @@ -712,16 +713,15 @@ public Map upgradePendingSegment * those versions. * * - * @return Map from original pending segment to the new upgraded ID. + * @return Inserted pending segment records */ - private Map upgradePendingSegments( + private List upgradePendingSegments( Handle handle, String datasource, Map replaceIntervalToMaxId ) throws JsonProcessingException { final List upgradedPendingSegments = new ArrayList<>(); - final Map pendingSegmentToNewId = new HashMap<>(); for (Map.Entry entry : replaceIntervalToMaxId.entrySet()) { final Interval replaceInterval = entry.getKey(); @@ -756,7 +756,6 @@ private Map upgradePendingSegmen overlappingPendingSegment.getTaskAllocatorId() ) ); - pendingSegmentToNewId.put(pendingSegmentId, newId); } } } @@ -774,7 +773,7 @@ private Map upgradePendingSegmen numInsertedPendingSegments, upgradedPendingSegments.size() ); - return pendingSegmentToNewId; + return upgradedPendingSegments; } private boolean shouldUpgradePendingSegment( @@ -1263,6 +1262,39 @@ public int hashCode() } } + private static void bindColumnValuesToQueryWithInCondition( + final String columnName, + final List values, + final Update query + ) + { + if (values == null) { + return; + } + + for (int i = 0; i < values.size(); i++) { + query.bind(StringUtils.format("%s%d", columnName, i), values.get(i)); + } + } + + private int deletePendingSegmentsById(Handle handle, String datasource, List pendingSegmentIds) + { + if (pendingSegmentIds.isEmpty()) { + return 0; + } + + Update query = handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE dataSource = :dataSource %s", + dbTables.getPendingSegmentsTable(), + SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", pendingSegmentIds) + ) + ).bind("dataSource", datasource); + bindColumnValuesToQueryWithInCondition("id", pendingSegmentIds, query); + + return query.execute(); + } + private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( Set appendSegments, Map appendSegmentToReplaceLock, @@ -1324,7 +1356,6 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( if (metadataUpdateResult.isFailed()) { transactionStatus.setRollbackOnly(); metadataNotUpdated.set(true); - if (metadataUpdateResult.canRetry()) { throw new RetryTransactionException(metadataUpdateResult.getErrorMsg()); } else { @@ -1334,6 +1365,13 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock); + + final List appendedSegmentIds + = allSegmentsToInsert.stream() + .map(pendingSegment -> pendingSegment.getId().toString()) + .collect(Collectors.toList()); + deletePendingSegmentsById(handle, dataSource, appendedSegmentIds); + return SegmentPublishResult.ok(insertSegments(handle, allSegmentsToInsert)); }, 3, diff --git a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java index 44c62bf47ad1..bfbaad18ef1a 100644 --- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java +++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java @@ -19,6 +19,8 @@ package org.apache.druid.metadata; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; @@ -50,12 +52,13 @@ public class PendingSegmentRecord private final String upgradedFromSegmentId; private final String taskAllocatorId; + @JsonCreator public PendingSegmentRecord( - SegmentIdWithShardSpec id, - String sequenceName, - String sequencePrevId, - @Nullable String upgradedFromSegmentId, - @Nullable String taskAllocatorId + @JsonProperty("id") SegmentIdWithShardSpec id, + @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("sequencePrevId") String sequencePrevId, + @JsonProperty("upgradedFromSegmentId") @Nullable String upgradedFromSegmentId, + @JsonProperty("taskAllocatorId") @Nullable String taskAllocatorId ) { this.id = id; @@ -65,16 +68,19 @@ public PendingSegmentRecord( this.taskAllocatorId = taskAllocatorId; } + @JsonProperty public SegmentIdWithShardSpec getId() { return id; } + @JsonProperty public String getSequenceName() { return sequenceName; } + @JsonProperty public String getSequencePrevId() { return sequencePrevId; @@ -85,6 +91,7 @@ public String getSequencePrevId() * Can be null for pending segments allocated before this column was added or for segments that have not been upgraded. */ @Nullable + @JsonProperty public String getUpgradedFromSegmentId() { return upgradedFromSegmentId; @@ -95,6 +102,7 @@ public String getUpgradedFromSegmentId() * Can be null for pending segments allocated before this column was added. */ @Nullable + @JsonProperty public String getTaskAllocatorId() { return taskAllocatorId; diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index cee378537d6d..fe1814fc8a1d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -898,7 +898,7 @@ private static int computeNumChangedSegments(List segmentIds, int[] segm * * @implNote JDBI 3.x has better support for binding {@code IN} clauses directly. */ - private static String getParameterizedInConditionForColumn(final String columnName, final List values) + public static String getParameterizedInConditionForColumn(final String columnName, final List values) { if (values == null) { return ""; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 0792f4a43bcc..dbc7b698234b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -51,6 +51,7 @@ import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; @@ -77,6 +78,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -142,6 +144,7 @@ public class StreamAppenderator implements Appenderator * of any thread from {@link #drop}. */ private final ConcurrentMap sinks = new ConcurrentHashMap<>(); + private final ConcurrentMap idToPendingSegment = new ConcurrentHashMap<>(); private final Set droppingSinks = Sets.newConcurrentHashSet(); private final VersionedIntervalTimeline sinkTimeline; private final long maxBytesTuningConfig; @@ -161,8 +164,10 @@ public class StreamAppenderator implements Appenderator private final AtomicBoolean closed = new AtomicBoolean(false); - private final ConcurrentHashMap> - baseSegmentToUpgradedVersions = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> baseSegmentToUpgradedVersions + = new ConcurrentHashMap<>(); + private final ConcurrentHashMap upgradedVersionToBaseSegment + = new ConcurrentHashMap<>(); private final SinkSchemaAnnouncer sinkSchemaAnnouncer; private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig; @@ -521,6 +526,10 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier) } sinks.put(identifier, retVal); + idToPendingSegment.put(identifier.asSegmentId().toString(), identifier); + baseSegmentToUpgradedVersions.put(identifier, new HashSet<>()); + baseSegmentToUpgradedVersions.get(identifier).add(identifier); + upgradedVersionToBaseSegment.put(identifier, identifier); metrics.setSinkCount(sinks.size()); sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal)); } @@ -1025,14 +1034,7 @@ public void closeNow() log.debug("Shutting down immediately..."); for (Map.Entry entry : sinks.entrySet()) { - try { - unannounceAllVersionsOfSegment(entry.getValue().getSegment()); - } - catch (Exception e) { - log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource()) - .addData("identifier", entry.getKey().toString()) - .emit(); - } + unannounceAllVersionsOfSegment(entry.getValue().getSegment()); } try { shutdownExecutors(); @@ -1065,13 +1067,14 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer() /** * Unannounces the given base segment and all its upgraded versions. */ - private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws IOException + private void unannounceAllVersionsOfSegment(DataSegment baseSegment) { - segmentAnnouncer.unannounceSegment(baseSegment); + unannounceSegment(baseSegment); + + final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment); + final Set upgradedVersionsOfSegment = baseSegmentToUpgradedVersions.get(baseId); - final Set upgradedVersionsOfSegment - = baseSegmentToUpgradedVersions.remove(baseSegment.getId()); - if (upgradedVersionsOfSegment == null || upgradedVersionsOfSegment.isEmpty()) { + if (CollectionUtils.isNullOrEmpty(upgradedVersionsOfSegment)) { return; } @@ -1087,15 +1090,32 @@ private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws IOEx baseSegment.getBinaryVersion(), baseSegment.getSize() ); - segmentAnnouncer.unannounceSegment(newSegment); + unannounceSegment(newSegment); + } + } + + private void unannounceSegment(DataSegment segment) + { + try { + segmentAnnouncer.unannounceSegment(segment); + } + catch (Exception e) { + log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource()) + .addData("identifier", segment.getId().toString()) + .emit(); + } + SegmentIdWithShardSpec id = SegmentIdWithShardSpec.fromDataSegment(segment); + SegmentIdWithShardSpec baseId = upgradedVersionToBaseSegment.remove(id); + baseSegmentToUpgradedVersions.get(baseId).remove(id); + if (baseSegmentToUpgradedVersions.get(baseId).isEmpty()) { + baseSegmentToUpgradedVersions.remove(baseId); } } - public void registerNewVersionOfPendingSegment( - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion - ) throws IOException + public void registerNewVersionOfPendingSegment(PendingSegmentRecord pendingSegmentRecord) throws IOException { + SegmentIdWithShardSpec basePendingSegment = idToPendingSegment.get(pendingSegmentRecord.getUpgradedFromSegmentId()); + SegmentIdWithShardSpec newSegmentVersion = pendingSegmentRecord.getId(); if (!sinks.containsKey(basePendingSegment) || droppingSinks.contains(basePendingSegment)) { return; } @@ -1105,21 +1125,26 @@ public void registerNewVersionOfPendingSegment( // Announce segments final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment(); + final DataSegment newSegment = getUpgradedSegment(baseSegment, newSegmentVersion); + + segmentAnnouncer.announceSegment(newSegment); + baseSegmentToUpgradedVersions.get(basePendingSegment).add(newSegmentVersion); + upgradedVersionToBaseSegment.put(newSegmentVersion, basePendingSegment); + } - final DataSegment newSegment = new DataSegment( - newSegmentVersion.getDataSource(), - newSegmentVersion.getInterval(), - newSegmentVersion.getVersion(), + private DataSegment getUpgradedSegment(DataSegment baseSegment, SegmentIdWithShardSpec upgradedVersion) + { + return new DataSegment( + upgradedVersion.getDataSource(), + upgradedVersion.getInterval(), + upgradedVersion.getVersion(), baseSegment.getLoadSpec(), baseSegment.getDimensions(), baseSegment.getMetrics(), - newSegmentVersion.getShardSpec(), + upgradedVersion.getShardSpec(), baseSegment.getBinaryVersion(), baseSegment.getSize() ); - segmentAnnouncer.announceSegment(newSegment); - baseSegmentToUpgradedVersions.computeIfAbsent(basePendingSegment.asSegmentId(), id -> new HashSet<>()) - .add(newSegmentVersion); } private void lockBasePersistDirectory() @@ -1335,6 +1360,10 @@ private Object bootstrapSinksFromDisk() ); rowsSoFar += currSink.getNumRows(); sinks.put(identifier, currSink); + idToPendingSegment.put(identifier.asSegmentId().toString(), identifier); + baseSegmentToUpgradedVersions.put(identifier, new HashSet<>()); + baseSegmentToUpgradedVersions.get(identifier).add(identifier); + upgradedVersionToBaseSegment.put(identifier, identifier); sinkTimeline.add( currSink.getInterval(), currSink.getVersion(), @@ -1369,6 +1398,13 @@ private ListenableFuture abandonSegment( final boolean removeOnDiskData ) { + final SegmentIdWithShardSpec baseIdentifier = upgradedVersionToBaseSegment.get(identifier); + synchronized (sink) { + if (baseSegmentToUpgradedVersions.get(baseIdentifier).size() > 1) { + unannounceSegment(getUpgradedSegment(sink.getSegment(), identifier)); + return Futures.immediateFuture(null); + } + } // Ensure no future writes will be made to this sink. if (sink.finishWriting()) { // Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement, @@ -1386,7 +1422,7 @@ private ListenableFuture abandonSegment( } // Mark this identifier as dropping, so no future push tasks will pick it up. - droppingSinks.add(identifier); + droppingSinks.add(baseIdentifier); // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread. return Futures.transform( @@ -1397,8 +1433,8 @@ private ListenableFuture abandonSegment( @Override public Void apply(@Nullable Object input) { - if (!sinks.remove(identifier, sink)) { - log.error("Sink for segment[%s] no longer valid, not abandoning.", identifier); + if (!sinks.remove(baseIdentifier, sink)) { + log.error("Sink for segment[%s] no longer valid, not abandoning.", baseIdentifier); return null; } @@ -1406,17 +1442,17 @@ public Void apply(@Nullable Object input) if (removeOnDiskData) { // Remove this segment from the committed list. This must be done from the persist thread. - log.debug("Removing commit metadata for segment[%s].", identifier); + log.debug("Removing commit metadata for segment[%s].", baseIdentifier); try { commitLock.lock(); final Committed oldCommit = readCommit(); if (oldCommit != null) { - writeCommit(oldCommit.without(identifier.toString())); + writeCommit(oldCommit.without(baseIdentifier.toString())); } } catch (Exception e) { log.makeAlert(e, "Failed to update committed segments[%s]", schema.getDataSource()) - .addData("identifier", identifier.toString()) + .addData("identifier", baseIdentifier.toString()) .emit(); throw new RuntimeException(e); } @@ -1426,21 +1462,15 @@ public Void apply(@Nullable Object input) } // Unannounce the segment. - try { - unannounceAllVersionsOfSegment(sink.getSegment()); - } - catch (Exception e) { - log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource()) - .addData("identifier", identifier.toString()) - .emit(); - } + final DataSegment segment = getUpgradedSegment(sink.getSegment(), identifier); + unannounceSegment(segment); Runnable removeRunnable = () -> { - droppingSinks.remove(identifier); + droppingSinks.remove(baseIdentifier); sinkTimeline.remove( sink.getInterval(), sink.getVersion(), - identifier.getShardSpec().createChunk(sink) + baseIdentifier.getShardSpec().createChunk(sink) ); for (FireHydrant hydrant : sink) { if (cache != null) { @@ -1450,7 +1480,7 @@ public Void apply(@Nullable Object input) } if (removeOnDiskData) { - removeDirectory(computePersistDir(identifier)); + removeDirectory(computePersistDir(baseIdentifier)); } log.info("Dropped segment[%s].", identifier); From 4ba335cf7532799722e189dbea1d6acd69d4c833 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 22 Apr 2024 14:48:53 +0530 Subject: [PATCH 05/17] Fix test flakyness --- .../realtime/appenderator/StreamAppenderatorDriverFailTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index b53c6f67e38d..6a5d153e8845 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -201,7 +201,7 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo expectedException.expect(ExecutionException.class); expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); expectedException.expectMessage( - "Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]" + "Fail test while dropping segment" ); driver = new StreamAppenderatorDriver( From 84cf8b50dc1129e87fc4d7df3a05dff38f117d4b Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 22 Apr 2024 21:08:46 +0530 Subject: [PATCH 06/17] Fix npe and other feedback --- .../druid/msq/indexing/MSQWorkerTask.java | 10 +------ .../batch/parallel/SinglePhaseSubTask.java | 9 +----- .../druid/indexing/overlord/TaskLockbox.java | 2 +- .../supervisor/SupervisorManager.java | 2 +- .../indexing/overlord/TaskLockboxTest.java | 4 +-- ...TestIndexerMetadataStorageCoordinator.java | 2 +- .../IndexerMetadataStorageCoordinator.java | 2 +- .../IndexerSQLMetadataStorageCoordinator.java | 13 +++++++-- .../appenderator/StreamAppenderator.java | 28 ++++++++++--------- 9 files changed, 33 insertions(+), 39 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index c04948402079..b4d18ea390e9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -32,7 +32,6 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; -import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.exec.Worker; @@ -46,7 +45,7 @@ import java.util.Set; @JsonTypeName(MSQWorkerTask.TYPE) -public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask +public class MSQWorkerTask extends AbstractTask { public static final String TYPE = "query_worker"; @@ -126,13 +125,6 @@ public Set getInputSourceResources() return ImmutableSet.of(); } - @Override - public String getTaskAllocatorId() - { - return getControllerTaskId(); - } - - @Override public boolean isReady(final TaskActionClient taskActionClient) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index e02d59936b20..0b4f62e6388c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.task.BatchAppenderators; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.IndexTaskUtils; -import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch; import org.apache.druid.indexing.common.task.SegmentAllocators; import org.apache.druid.indexing.common.task.TaskResource; @@ -106,7 +105,7 @@ * generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of * publishing on its own. */ -public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask +public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler { public static final String TYPE = "single_phase_sub_task"; public static final String OLD_TYPE_NAME = "index_sub"; @@ -237,12 +236,6 @@ public String getSubtaskSpecId() return subtaskSpecId; } - @Override - public String getTaskAllocatorId() - { - return getGroupId(); - } - @Override public TaskStatus runTask(final TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 35ec79d74ec7..77422d9e8bf7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -1231,7 +1231,7 @@ public void remove(final Task task) idsInSameGroup.remove(task.getId()); if (idsInSameGroup.isEmpty()) { final int pendingSegmentsDeleted - = metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId); + = metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId); log.info( "Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.", pendingSegmentsDeleted, taskAllocatorId diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index c1eba5602409..3cf346689fae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -330,7 +330,7 @@ public boolean registerNewVersionOfPendingSegmentOnSupervisor( return true; } catch (Exception e) { - log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed", + log.error(e, "PendingSegment[%s] mapping update request to version[%s] on Supervisor[%s] failed", upgradedPendingSegment.getUpgradedFromSegmentId(), upgradedPendingSegment.getId().getVersion(), supervisorId); } return false; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 999d4d0abb2e..d9fbdada90d2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1905,8 +1905,8 @@ public void testCleanupOnUnlock() // Only the replaceTask should attempt a delete on the upgradeSegments table EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once(); // Any task may attempt pending segment clean up - EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(replaceTask.getId())).andReturn(0).once(); - EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(appendTask.getId())).andReturn(0).once(); + EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once(); + EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once(); EasyMock.replay(coordinator); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 8bcbc7bbd81c..2abf728e5162 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -289,7 +289,7 @@ public int deleteUpgradeSegmentsForTask(final String taskId) } @Override - public int deletePendingSegmentsForTaskGroup(final String taskGroup) + public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup) { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 830cfbfb41de..0ef239f73373 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -484,7 +484,7 @@ SegmentPublishResult commitMetadataOnly( * @param taskAllocatorId task id / task group / replica group for an appending task * @return number of pending segments deleted from the metadata store */ - int deletePendingSegmentsForTaskGroup(String taskAllocatorId); + int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId); /** * Fetches all the pending segments of the datasource that overlap with a given interval. diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 32c26cb2808e..0cc9c34807fb 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1056,8 +1056,15 @@ private SegmentIdWithShardSpec allocatePendingSegment( ); // always insert empty previous sequence id - insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1, - taskAllocatorId + insertPendingSegmentIntoMetastore( + handle, + newIdentifier, + dataSource, + interval, + "", + sequenceName, + sequenceNamePrevIdSha1, + taskAllocatorId ); log.info( @@ -2584,7 +2591,7 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) } @Override - public int deletePendingSegmentsForTaskGroup(final String pendingSegmentsGroup) + public int deletePendingSegmentsForTaskAllocatorId(final String pendingSegmentsGroup) { return connector.getDBI().inTransaction( (handle, status) -> handle diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index dbc7b698234b..7134720eb150 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -78,7 +78,6 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; -import org.apache.druid.utils.CollectionUtils; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -1034,7 +1033,9 @@ public void closeNow() log.debug("Shutting down immediately..."); for (Map.Entry entry : sinks.entrySet()) { - unannounceAllVersionsOfSegment(entry.getValue().getSegment()); + synchronized (entry.getValue()) { + unannounceAllVersionsOfSegment(entry.getValue().getSegment()); + } } try { shutdownExecutors(); @@ -1069,14 +1070,9 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer() */ private void unannounceAllVersionsOfSegment(DataSegment baseSegment) { - unannounceSegment(baseSegment); - final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment); - final Set upgradedVersionsOfSegment = baseSegmentToUpgradedVersions.get(baseId); - - if (CollectionUtils.isNullOrEmpty(upgradedVersionsOfSegment)) { - return; - } + final List upgradedVersionsOfSegment + = ImmutableList.copyOf(baseSegmentToUpgradedVersions.get(baseId)); for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) { final DataSegment newSegment = new DataSegment( @@ -1106,6 +1102,9 @@ private void unannounceSegment(DataSegment segment) } SegmentIdWithShardSpec id = SegmentIdWithShardSpec.fromDataSegment(segment); SegmentIdWithShardSpec baseId = upgradedVersionToBaseSegment.remove(id); + if (baseId == null) { + return; + } baseSegmentToUpgradedVersions.get(baseId).remove(id); if (baseSegmentToUpgradedVersions.get(baseId).isEmpty()) { baseSegmentToUpgradedVersions.remove(baseId); @@ -1398,9 +1397,10 @@ private ListenableFuture abandonSegment( final boolean removeOnDiskData ) { - final SegmentIdWithShardSpec baseIdentifier = upgradedVersionToBaseSegment.get(identifier); + final SegmentIdWithShardSpec baseIdentifier = upgradedVersionToBaseSegment.getOrDefault(identifier, identifier); synchronized (sink) { - if (baseSegmentToUpgradedVersions.get(baseIdentifier).size() > 1) { + if (baseSegmentToUpgradedVersions.containsKey(baseIdentifier) + && baseSegmentToUpgradedVersions.get(baseIdentifier).size() > 1) { unannounceSegment(getUpgradedSegment(sink.getSegment(), identifier)); return Futures.immediateFuture(null); } @@ -1462,8 +1462,10 @@ public Void apply(@Nullable Object input) } // Unannounce the segment. - final DataSegment segment = getUpgradedSegment(sink.getSegment(), identifier); - unannounceSegment(segment); + synchronized (sink) { + final DataSegment segment = getUpgradedSegment(sink.getSegment(), identifier); + unannounceSegment(segment); + } Runnable removeRunnable = () -> { droppingSinks.remove(baseIdentifier); From 0599f3ff2ec9cc46784a275d6f71a9ba00ddede1 Mon Sep 17 00:00:00 2001 From: Amatya Date: Mon, 22 Apr 2024 21:21:06 +0530 Subject: [PATCH 07/17] Fix compilation --- .../org/apache/druid/msq/indexing/MSQWorkerTaskTest.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 482d67d81abe..6eff77184ea7 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -108,12 +108,4 @@ public void testGetInputSourceResources() MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty()); } - - @Test - public void testGetTaskAllocatorId() - { - MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); - Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId()); - } - } From cca38db926728918e9fc5be76bc47b608af908af Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 24 Apr 2024 12:26:54 +0530 Subject: [PATCH 08/17] Uannounce all versions at once --- .../appenderator/StreamAppenderator.java | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 7134720eb150..5fc8b243264e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -163,10 +163,13 @@ public class StreamAppenderator implements Appenderator private final AtomicBoolean closed = new AtomicBoolean(false); - private final ConcurrentHashMap> baseSegmentToUpgradedVersions + private final ConcurrentMap> baseSegmentToUpgradedVersions = new ConcurrentHashMap<>(); - private final ConcurrentHashMap upgradedVersionToBaseSegment + private final ConcurrentMap upgradedVersionToBaseSegment = new ConcurrentHashMap<>(); + private final ConcurrentHashMap.KeySetView abandonedSegments + = ConcurrentHashMap.newKeySet(); + private final SinkSchemaAnnouncer sinkSchemaAnnouncer; private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig; @@ -1071,6 +1074,10 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer() private void unannounceAllVersionsOfSegment(DataSegment baseSegment) { final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment); + if (!baseSegmentToUpgradedVersions.containsKey(baseId)) { + return; + } + final List upgradedVersionsOfSegment = ImmutableList.copyOf(baseSegmentToUpgradedVersions.get(baseId)); @@ -1397,12 +1404,15 @@ private ListenableFuture abandonSegment( final boolean removeOnDiskData ) { + abandonedSegments.add(identifier); final SegmentIdWithShardSpec baseIdentifier = upgradedVersionToBaseSegment.getOrDefault(identifier, identifier); synchronized (sink) { - if (baseSegmentToUpgradedVersions.containsKey(baseIdentifier) - && baseSegmentToUpgradedVersions.get(baseIdentifier).size() > 1) { - unannounceSegment(getUpgradedSegment(sink.getSegment(), identifier)); - return Futures.immediateFuture(null); + if (baseSegmentToUpgradedVersions.containsKey(baseIdentifier)) { + Set relevantSegments = new HashSet<>(baseSegmentToUpgradedVersions.get(baseIdentifier)); + relevantSegments.removeAll(abandonedSegments); + if (!relevantSegments.isEmpty()) { + return Futures.immediateFuture(null); + } } } // Ensure no future writes will be made to this sink. @@ -1463,8 +1473,7 @@ public Void apply(@Nullable Object input) // Unannounce the segment. synchronized (sink) { - final DataSegment segment = getUpgradedSegment(sink.getSegment(), identifier); - unannounceSegment(segment); + unannounceAllVersionsOfSegment(sink.getSegment()); } Runnable removeRunnable = () -> { From 0ab232628686c58c6b31be90db25165a1b749af6 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 24 Apr 2024 14:42:53 +0530 Subject: [PATCH 09/17] Add test and address feedback --- .../MaterializedViewSupervisorSpecTest.java | 2 - .../SegmentTransactionalReplaceAction.java | 2 +- .../supervisor/SupervisorManager.java | 11 ++-- .../SeekableStreamIndexTaskRunner.java | 6 +- .../supervisor/SeekableStreamSupervisor.java | 36 ++++------- ...ncurrentReplaceAndStreamingAppendTest.java | 2 +- .../SeekableStreamSupervisorStateTest.java | 64 ++++++++++++++++--- .../overlord/SegmentPublishResult.java | 3 +- .../supervisor/NoopSupervisorSpec.java | 7 -- .../overlord/supervisor/Supervisor.java | 6 -- .../IndexerSQLMetadataStorageCoordinator.java | 1 + .../metadata/SqlSegmentsMetadataQuery.java | 2 +- .../appenderator/BaseAppenderatorDriver.java | 1 + .../SegmentsAndCommitMetadata.java | 2 + .../appenderator/StreamAppenderator.java | 38 +++++------ 15 files changed, 102 insertions(+), 81 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index 365fb1751eac..14bd59871253 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -207,8 +207,6 @@ public void testMaterializedViewSupervisorSpecCreated() Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveTaskGroupsCount()); - Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveRealtimeSequencePrefixes()); - Callable noop = new Callable() { @Override public Integer call() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 9d12f57a1cc6..2a527db2c6a9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -165,7 +165,7 @@ private void registerUpgradedPendingSegmentsOnSupervisor( } upgradedPendingSegments.forEach( - upgradedPendingSegment -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( + upgradedPendingSegment -> supervisorManager.registerUpgradedPendingSegmentOnSupervisor( activeSupervisorIdWithAppendLock.get(), upgradedPendingSegment ) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 3cf346689fae..288b2a141564 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -308,7 +308,7 @@ public boolean checkPointDataSourceMetadata( * allows the supervisor to include the pending segment in queries fired against * that segment version. */ - public boolean registerNewVersionOfPendingSegmentOnSupervisor( + public boolean registerUpgradedPendingSegmentOnSupervisor( String supervisorId, PendingSegmentRecord upgradedPendingSegment ) @@ -316,8 +316,11 @@ public boolean registerNewVersionOfPendingSegmentOnSupervisor( try { Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null"); Preconditions.checkNotNull(upgradedPendingSegment, "upgraded pending segment cannot be null"); - Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), "replica group cannot be null"); - Preconditions.checkNotNull(upgradedPendingSegment.getUpgradedFromSegmentId(), "root id cannot be null"); + Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), "taskAllocatorId cannot be null"); + Preconditions.checkNotNull( + upgradedPendingSegment.getUpgradedFromSegmentId(), + "upgradedFromSegmentId cannot be null" + ); Pair supervisor = supervisors.get(supervisorId); Preconditions.checkNotNull(supervisor, "supervisor could not be found"); @@ -330,7 +333,7 @@ public boolean registerNewVersionOfPendingSegmentOnSupervisor( return true; } catch (Exception e) { - log.error(e, "PendingSegment[%s] mapping update request to version[%s] on Supervisor[%s] failed", + log.error(e, "Failed to upgrade pending segment[%s] to new pending segment[%s] on Supervisor[%s].", upgradedPendingSegment.getUpgradedFromSegmentId(), upgradedPendingSegment.getId().getVersion(), supervisorId); } return false; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 5813598169ca..2e02efdeb19e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1576,7 +1576,7 @@ public Response setEndOffsetsHTTP( @Path("/pendingSegmentVersion") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public Response registerNewVersionOfPendingSegment( + public Response registerUpgradedPendingSegment( PendingSegmentRecord upgradedPendingSegment, // this field is only for internal purposes, shouldn't be usually set by users @Context final HttpServletRequest req @@ -1596,8 +1596,8 @@ public Response registerNewVersionOfPendingSegment( catch (Exception e) { log.error( e, - "Could not register new version[%s] of pending segment[%s]", - upgradedPendingSegment.getId().getVersion(), upgradedPendingSegment.getUpgradedFromSegmentId() + "Could not register pending segment[%s] upgraded from[%s]", + upgradedPendingSegment.getId().asSegmentId(), upgradedPendingSegment.getUpgradedFromSegmentId() ); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index d0495700eb2a..58a433325a3f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -178,7 +178,8 @@ public abstract class SeekableStreamSupervisor taskIds() return tasks.keySet(); } + @VisibleForTesting + public String getBaseSequenceName() + { + return baseSequenceName; + } } private class TaskData @@ -1096,28 +1102,6 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) addNotice(new ResetOffsetsNotice(resetDataSourceMetadata)); } - /** - * The base sequence name of a seekable stream task group is used as a prefix of the sequence names - * of pending segments published by it. - * This method can be used to identify the active pending segments for a datasource - * by checking if the sequence name begins with any of the active realtime sequence prefix returned by this method - * @return the set of base sequence names of both active and pending completion task gruops. - */ - @Override - public Set getActiveRealtimeSequencePrefixes() - { - final Set activeBaseSequences = new HashSet<>(); - for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { - activeBaseSequences.add(taskGroup.baseSequenceName); - } - for (List taskGroupList : pendingCompletionTaskGroups.values()) { - for (TaskGroup taskGroup : taskGroupList) { - activeBaseSequences.add(taskGroup.baseSequenceName); - } - } - return activeBaseSequences; - } - public void registerNewVersionOfPendingSegment( PendingSegmentRecord pendingSegmentRecord ) @@ -1551,7 +1535,7 @@ private List getCurrentParseErrors() } @VisibleForTesting - public void addTaskGroupToActivelyReadingTaskGroup( + public TaskGroup addTaskGroupToActivelyReadingTaskGroup( int taskGroupId, ImmutableMap partitionOffsets, Optional minMsgTime, @@ -1575,10 +1559,11 @@ public void addTaskGroupToActivelyReadingTaskGroup( taskGroupId ); } + return group; } @VisibleForTesting - public void addTaskGroupToPendingCompletionTaskGroup( + public TaskGroup addTaskGroupToPendingCompletionTaskGroup( int taskGroupId, ImmutableMap partitionOffsets, Optional minMsgTime, @@ -1598,6 +1583,7 @@ public void addTaskGroupToPendingCompletionTaskGroup( group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData()))); pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new CopyOnWriteArrayList<>()) .add(group); + return group; } @VisibleForTesting diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java index e4273d4b307e..7da5a3d19fe8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java @@ -168,7 +168,7 @@ public void setUpIngestionTestBase() throws IOException appendTask = createAndStartTask(); supervisorId = Capture.newInstance(CaptureType.ALL); pendingSegment = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( + EasyMock.expect(supervisorManager.registerUpgradedPendingSegmentOnSupervisor( EasyMock.capture(supervisorId), EasyMock.capture(pendingSegment) )).andReturn(true).anyTimes(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 2602f8e5441f..489315cc2495 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -74,11 +74,13 @@ import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -86,6 +88,10 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.easymock.Capture; +import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.hamcrest.MatcherAssert; @@ -1548,10 +1554,19 @@ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws In } @Test - public void testGetActiveRealtimeSequencePrefixes() + public void testRegisterNewVersionOfPendingSegment() { EasyMock.expect(spec.isSuspended()).andReturn(false); + Capture captured0 = Capture.newInstance(CaptureType.FIRST); + Capture captured1 = Capture.newInstance(CaptureType.FIRST); + EasyMock.expect( + indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task0"), EasyMock.capture(captured0)) + ).andReturn(Futures.immediateFuture(true)); + EasyMock.expect( + indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task2"), EasyMock.capture(captured1)) + ).andReturn(Futures.immediateFuture(true)); + replayAll(); final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); @@ -1559,34 +1574,63 @@ public void testGetActiveRealtimeSequencePrefixes() // Spin off two active tasks with each task serving one partition. supervisor.getIoConfig().setTaskCount(3); supervisor.start(); - supervisor.addTaskGroupToActivelyReadingTaskGroup( + + final SeekableStreamSupervisor.TaskGroup taskGroup0 = supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition("0"), ImmutableMap.of("0", "5"), Optional.absent(), Optional.absent(), - ImmutableSet.of("task1"), + ImmutableSet.of("task0"), ImmutableSet.of() ); - - supervisor.addTaskGroupToActivelyReadingTaskGroup( + final SeekableStreamSupervisor.TaskGroup taskGroup1 = supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", "6"), Optional.absent(), Optional.absent(), - ImmutableSet.of("task2"), + ImmutableSet.of("task1"), ImmutableSet.of() ); - - supervisor.addTaskGroupToPendingCompletionTaskGroup( + final SeekableStreamSupervisor.TaskGroup taskGroup2 = supervisor.addTaskGroupToPendingCompletionTaskGroup( supervisor.getTaskGroupIdForPartition("2"), ImmutableMap.of("2", "100"), Optional.absent(), Optional.absent(), - ImmutableSet.of("task3"), + ImmutableSet.of("task2"), ImmutableSet.of() ); - Assert.assertEquals(3, supervisor.getActiveRealtimeSequencePrefixes().size()); + final PendingSegmentRecord pendingSegmentRecord0 = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "DS", + Intervals.of("2024/2025"), + "2024", + new NumberedShardSpec(1, 0) + ), + taskGroup0.getBaseSequenceName(), + "prevId0", + "someAppendedSegment0", + taskGroup0.getBaseSequenceName() + ); + final PendingSegmentRecord pendingSegmentRecord1 = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "DS", + Intervals.of("2024/2025"), + "2024", + new NumberedShardSpec(2, 0) + ), + taskGroup2.getBaseSequenceName(), + "prevId1", + "someAppendedSegment1", + taskGroup2.getBaseSequenceName() + ); + + supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord0); + supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord1); + + Assert.assertEquals(pendingSegmentRecord0, captured0.getValue()); + Assert.assertEquals(pendingSegmentRecord1, captured1.getValue()); + verifyAll(); } @Test diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index c4cecd9620cc..e4bc1645f710 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.overlord; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -118,7 +117,7 @@ public String getErrorMsg() return errorMsg; } - @JsonIgnore + @Nullable public List getUpgradedPendingSegments() { return upgradedPendingSegments; diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 20c102533862..e733ef6c233d 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -31,7 +31,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -186,12 +185,6 @@ public int getActiveTaskGroupsCount() { return -1; } - - @Override - public Set getActiveRealtimeSequencePrefixes() - { - return Collections.emptySet(); - } }; } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 9b9511cbf3da..b1fb439184d4 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -29,7 +29,6 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; -import java.util.Set; public interface Supervisor { @@ -103,9 +102,4 @@ default long computeLagForAutoScaler() } int getActiveTaskGroupsCount(); - - /** - * @return active sequence prefixes for reading and pending completion task groups of a seekable stream supervisor - */ - Set getActiveRealtimeSequencePrefixes(); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 0cc9c34807fb..67dcb5244d38 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1373,6 +1373,7 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock); + // Delete the pending segments committed in this transaction final List appendedSegmentIds = allSegmentsToInsert.stream() .map(pendingSegment -> pendingSegment.getId().toString()) diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index fe1814fc8a1d..13fd50048a7b 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -898,7 +898,7 @@ private static int computeNumChangedSegments(List segmentIds, int[] segm * * @implNote JDBI 3.x has better support for binding {@code IN} clauses directly. */ - public static String getParameterizedInConditionForColumn(final String columnName, final List values) + static String getParameterizedInConditionForColumn(final String columnName, final List values) { if (values == null) { return ""; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 627ed75b3678..6ad34bbde9a8 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -635,6 +635,7 @@ ListenableFuture publishInBackground( callerMetadata ); log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); + // This set must contain only those segments that were upgraded as a result of a concurrent replace. upgradedSegments.addAll(publishResult.getSegments()); segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove); if (!upgradedSegments.isEmpty()) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java index 6853060a2e55..895a3debef3d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java @@ -36,6 +36,8 @@ public class SegmentsAndCommitMetadata private final Object commitMetadata; private final ImmutableList segments; + + // This set corresponds to the set of extra segments committed due to a concurrent replace. private final ImmutableSet upgradedSegments; public SegmentsAndCommitMetadata( diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 5fc8b243264e..6403d6f71c27 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -163,9 +163,9 @@ public class StreamAppenderator implements Appenderator private final AtomicBoolean closed = new AtomicBoolean(false); - private final ConcurrentMap> baseSegmentToUpgradedVersions + private final ConcurrentMap> baseSegmentToUpgradedSegments = new ConcurrentHashMap<>(); - private final ConcurrentMap upgradedVersionToBaseSegment + private final ConcurrentMap upgradedSegmentToBaseSegment = new ConcurrentHashMap<>(); private final ConcurrentHashMap.KeySetView abandonedSegments = ConcurrentHashMap.newKeySet(); @@ -529,9 +529,9 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier) sinks.put(identifier, retVal); idToPendingSegment.put(identifier.asSegmentId().toString(), identifier); - baseSegmentToUpgradedVersions.put(identifier, new HashSet<>()); - baseSegmentToUpgradedVersions.get(identifier).add(identifier); - upgradedVersionToBaseSegment.put(identifier, identifier); + baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); + baseSegmentToUpgradedSegments.get(identifier).add(identifier); + upgradedSegmentToBaseSegment.put(identifier, identifier); metrics.setSinkCount(sinks.size()); sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal)); } @@ -1074,12 +1074,12 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer() private void unannounceAllVersionsOfSegment(DataSegment baseSegment) { final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment); - if (!baseSegmentToUpgradedVersions.containsKey(baseId)) { + if (!baseSegmentToUpgradedSegments.containsKey(baseId)) { return; } final List upgradedVersionsOfSegment - = ImmutableList.copyOf(baseSegmentToUpgradedVersions.get(baseId)); + = ImmutableList.copyOf(baseSegmentToUpgradedSegments.get(baseId)); for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) { final DataSegment newSegment = new DataSegment( @@ -1108,13 +1108,13 @@ private void unannounceSegment(DataSegment segment) .emit(); } SegmentIdWithShardSpec id = SegmentIdWithShardSpec.fromDataSegment(segment); - SegmentIdWithShardSpec baseId = upgradedVersionToBaseSegment.remove(id); + SegmentIdWithShardSpec baseId = upgradedSegmentToBaseSegment.remove(id); if (baseId == null) { return; } - baseSegmentToUpgradedVersions.get(baseId).remove(id); - if (baseSegmentToUpgradedVersions.get(baseId).isEmpty()) { - baseSegmentToUpgradedVersions.remove(baseId); + baseSegmentToUpgradedSegments.get(baseId).remove(id); + if (baseSegmentToUpgradedSegments.get(baseId).isEmpty()) { + baseSegmentToUpgradedSegments.remove(baseId); } } @@ -1134,8 +1134,8 @@ public void registerNewVersionOfPendingSegment(PendingSegmentRecord pendingSegme final DataSegment newSegment = getUpgradedSegment(baseSegment, newSegmentVersion); segmentAnnouncer.announceSegment(newSegment); - baseSegmentToUpgradedVersions.get(basePendingSegment).add(newSegmentVersion); - upgradedVersionToBaseSegment.put(newSegmentVersion, basePendingSegment); + baseSegmentToUpgradedSegments.get(basePendingSegment).add(newSegmentVersion); + upgradedSegmentToBaseSegment.put(newSegmentVersion, basePendingSegment); } private DataSegment getUpgradedSegment(DataSegment baseSegment, SegmentIdWithShardSpec upgradedVersion) @@ -1367,9 +1367,9 @@ private Object bootstrapSinksFromDisk() rowsSoFar += currSink.getNumRows(); sinks.put(identifier, currSink); idToPendingSegment.put(identifier.asSegmentId().toString(), identifier); - baseSegmentToUpgradedVersions.put(identifier, new HashSet<>()); - baseSegmentToUpgradedVersions.get(identifier).add(identifier); - upgradedVersionToBaseSegment.put(identifier, identifier); + baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); + baseSegmentToUpgradedSegments.get(identifier).add(identifier); + upgradedSegmentToBaseSegment.put(identifier, identifier); sinkTimeline.add( currSink.getInterval(), currSink.getVersion(), @@ -1405,10 +1405,10 @@ private ListenableFuture abandonSegment( ) { abandonedSegments.add(identifier); - final SegmentIdWithShardSpec baseIdentifier = upgradedVersionToBaseSegment.getOrDefault(identifier, identifier); + final SegmentIdWithShardSpec baseIdentifier = upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier); synchronized (sink) { - if (baseSegmentToUpgradedVersions.containsKey(baseIdentifier)) { - Set relevantSegments = new HashSet<>(baseSegmentToUpgradedVersions.get(baseIdentifier)); + if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) { + Set relevantSegments = new HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier)); relevantSegments.removeAll(abandonedSegments); if (!relevantSegments.isEmpty()) { return Futures.immediateFuture(null); From 7e8be63031f7207a4f79b3cdc8b00cf703d3636b Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 24 Apr 2024 14:55:58 +0530 Subject: [PATCH 10/17] Fix compilation --- .../materializedview/MaterializedViewSupervisor.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 7e0eaf60d836..9da665adde46 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -298,12 +298,6 @@ public LagStats computeLagStats() throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor"); } - @Override - public Set getActiveRealtimeSequencePrefixes() - { - throw new UnsupportedOperationException(); - } - @Override public int getActiveTaskGroupsCount() { From 7b212a2e7d70999934fb4650e862566549ac0631 Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 24 Apr 2024 18:28:29 +0530 Subject: [PATCH 11/17] Add test for coverage --- .../supervisor/SupervisorManagerTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 5ffbd4b94608..4a9fccd4663b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -31,7 +31,11 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.PendingSegmentRecord; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -544,6 +548,53 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() verifyAll(); } + @Test + public void testRegisterUpgradedPendingSegmentOnSupervisor() + { + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap()); + + NoopSupervisorSpec noopSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS")); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec streamingSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + SeekableStreamSupervisor streamSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes(); + EasyMock.expect(streamingSpec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(streamingSpec.getDataSources()).andReturn(ImmutableList.of("DS")).anyTimes(); + EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes(); + EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes(); + EasyMock.replay(streamingSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + + replayAll(); + + final PendingSegmentRecord pendingSegment = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "DS", + Intervals.ETERNITY, + "version", + new NumberedShardSpec(0, 0) + ), + "sequenceName", + "prevSegmentId", + "upgradedFromSegmentId", + "taskAllocatorId" + ); + manager.start(); + + manager.createOrUpdateAndStartSupervisor(noopSpec); + Assert.assertFalse(manager.registerUpgradedPendingSegmentOnSupervisor("noop", pendingSegment)); + + manager.createOrUpdateAndStartSupervisor(streamingSpec); + Assert.assertTrue(manager.registerUpgradedPendingSegmentOnSupervisor("sss", pendingSegment)); + + verifyAll(); + } + private static class TestSupervisorSpec implements SupervisorSpec { private final String id; From 5e084ead41c27c5028f0551aa9351daa4eff53cf Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 24 Apr 2024 19:48:23 +0530 Subject: [PATCH 12/17] Delete pending segments in batches of at most 100 --- .../IndexerSQLMetadataStorageCoordinator.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 67dcb5244d38..23bc9266a585 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1373,12 +1373,16 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock); - // Delete the pending segments committed in this transaction - final List appendedSegmentIds - = allSegmentsToInsert.stream() - .map(pendingSegment -> pendingSegment.getId().toString()) - .collect(Collectors.toList()); - deletePendingSegmentsById(handle, dataSource, appendedSegmentIds); + // Delete the pending segments to be committed in this transaction in batches of at most 100 + final List> pendingSegmentIdBatches = Lists.partition( + allSegmentsToInsert.stream() + .map(pendingSegment -> pendingSegment.getId().toString()) + .collect(Collectors.toList()), + 100 + ); + for (List pendingSegmentIdBatch : pendingSegmentIdBatches) { + deletePendingSegmentsById(handle, dataSource, pendingSegmentIdBatch); + } return SegmentPublishResult.ok(insertSegments(handle, allSegmentsToInsert)); }, From 2dc3b10a92e5b93ba0da922ae3075c206d09d4e5 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 25 Apr 2024 09:42:05 +0530 Subject: [PATCH 13/17] Initialize sets only where needed --- .../realtime/appenderator/BaseAppenderatorDriver.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index f2d9bebe1b5b..65df4f56761a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -616,13 +616,13 @@ ListenableFuture publishInBackground( final Object callerMetadata = metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(); - final Set upgradedSegments = new HashSet<>(); return executor.submit( () -> { try { - RetryUtils.retry( + return RetryUtils.retry( () -> { try { + final Set upgradedSegments = new HashSet<>(); final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); final SegmentPublishResult publishResult = publisher.publishSegments( segmentsToBeOverwritten, @@ -699,6 +699,7 @@ ListenableFuture publishInBackground( throw new ISE("Failed to publish segments"); } } + return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); } catch (Exception e) { // Must not remove segments here, we aren't sure if our transaction succeeded or not. @@ -711,9 +712,10 @@ ListenableFuture publishInBackground( Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); }, - e -> (e.getMessage() != null && e.getMessage().contains("Failed to update the metadata Store. The new start metadata is ahead of last commited end state.")), + e -> (e != null && e.getMessage() != null + && e.getMessage().contains("Failed to update the metadata Store." + + " The new start metadata is ahead of last commited end state.")), RetryUtils.DEFAULT_MAX_TRIES ); } @@ -725,7 +727,6 @@ ListenableFuture publishInBackground( Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); } ); } From c83b7d985de3097fe0ab85bcdd916936acb578a9 Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 25 Apr 2024 13:00:01 +0530 Subject: [PATCH 14/17] Address feedback --- .../IndexerSQLMetadataStorageCoordinator.java | 4 +- .../SegmentsAndCommitMetadata.java | 4 +- .../appenderator/StreamAppenderator.java | 56 +++++++++++++------ .../StreamAppenderatorDriver.java | 11 +--- 4 files changed, 49 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 3844d1a811e7..c5a36656c9a2 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1439,9 +1439,11 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( .collect(Collectors.toList()), 100 ); + int numDeletedPendingSegments = 0; for (List pendingSegmentIdBatch : pendingSegmentIdBatches) { - deletePendingSegmentsById(handle, dataSource, pendingSegmentIdBatch); + numDeletedPendingSegments += deletePendingSegmentsById(handle, dataSource, pendingSegmentIdBatch); } + log.info("Deleted [%d] entries from pending segments table upon commit.", numDeletedPendingSegments); return SegmentPublishResult.ok( insertSegments( diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java index 10ce84a2458d..721876880578 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java @@ -40,7 +40,6 @@ public class SegmentsAndCommitMetadata private final ImmutableList segments; private final SegmentSchemaMapping segmentSchemaMapping; - // This set corresponds to the set of extra segments committed due to a concurrent replace. private final ImmutableSet upgradedSegments; public SegmentsAndCommitMetadata( @@ -94,6 +93,9 @@ public List getSegments() return segments; } + /** + * @return the set of extra upgraded segments committed due to a concurrent replace. + */ @Nullable public Set getUpgradedSegments() { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 3acc1d5d3f2a..173d9a5dd1a6 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -168,10 +168,23 @@ public class StreamAppenderator implements Appenderator private final AtomicBoolean closed = new AtomicBoolean(false); + /** + * Map from base segment identifier of a sink to the set of all the segment ids associated with it. + * The set contains the base segment itself and its upgraded versions announced as a result of a concurrent replace. + * The map contains all the available sinks' identifiers in its keyset. + */ private final ConcurrentMap> baseSegmentToUpgradedSegments = new ConcurrentHashMap<>(); + /** + * Map from the id of an upgraded pending segment to the segment corresponding to its upgradedFromSegmentId. + * The set contains the base segment itself and its upgraded versions announced as a result of a concurrent replace. + */ private final ConcurrentMap upgradedSegmentToBaseSegment = new ConcurrentHashMap<>(); + /** + * Set of all segment identifiers that have been marked to be abandoned. + * This is used to determine if all the segments corresponding to a sink have been abandoned and it can be dropped. + */ private final ConcurrentHashMap.KeySetView abandonedSegments = ConcurrentHashMap.newKeySet(); @@ -534,12 +547,7 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier) .emit(); } - sinks.put(identifier, retVal); - idToPendingSegment.put(identifier.asSegmentId().toString(), identifier); - baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); - baseSegmentToUpgradedSegments.get(identifier).add(identifier); - upgradedSegmentToBaseSegment.put(identifier, identifier); - metrics.setSinkCount(sinks.size()); + addSink(identifier, retVal); sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal)); } @@ -1398,17 +1406,8 @@ private Object bootstrapSinksFromDisk() hydrants ); rowsSoFar += currSink.getNumRows(); - sinks.put(identifier, currSink); - idToPendingSegment.put(identifier.asSegmentId().toString(), identifier); - baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); - baseSegmentToUpgradedSegments.get(identifier).add(identifier); - upgradedSegmentToBaseSegment.put(identifier, identifier); - sinkTimeline.add( - currSink.getInterval(), - currSink.getVersion(), - identifier.getShardSpec().createChunk(currSink) - ); + addSink(identifier, currSink); segmentAnnouncer.announceSegment(currSink.getSegment()); } catch (IOException e) { @@ -1431,6 +1430,31 @@ private Object bootstrapSinksFromDisk() return committed.getMetadata(); } + /** + * Update the state of the appenderator when adding a sink. + * + * @param identifier sink identifier + * @param sink sink to be added + */ + private void addSink(SegmentIdWithShardSpec identifier, Sink sink) + { + sinks.put(identifier, sink); + // Asoociate the base segment of a sink with its string identifier + // Needed to get the base segment using upgradedFromSegmentId of a pending segment + idToPendingSegment.put(identifier.asSegmentId().toString(), identifier); + + // The base segment is associated with itself in the maps to maintain all the upgraded ids of a sink. + baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); + baseSegmentToUpgradedSegments.get(identifier).add(identifier); + upgradedSegmentToBaseSegment.put(identifier, identifier); + + sinkTimeline.add( + sink.getInterval(), + sink.getVersion(), + identifier.getShardSpec().createChunk(sink) + ); + } + private ListenableFuture abandonSegment( final SegmentIdWithShardSpec identifier, final Sink sink, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 0cd35b7c9630..2b5c153d602e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -372,12 +372,7 @@ public ListenableFuture registerHandoff(SegmentsAndCo public void onSuccess(Object result) { if (numRemainingHandoffSegments.decrementAndGet() == 0) { - List segments = segmentsAndCommitMetadata.getSegments(); - Set upgradedSegments = segmentsAndCommitMetadata.getUpgradedSegments(); - log.info("Successfully handed off [%d] segments.", segments.size()); - if (upgradedSegments != null) { - log.info("Successfully handed off [%d] upgraded segments.", upgradedSegments.size()); - } + log.info("Successfully handed off [%d] segments.", segmentsToBeHandedOff.size()); final long handoffTotalTime = System.currentTimeMillis() - handoffStartTime; metrics.reportMaxSegmentHandoffTime(handoffTotalTime); if (handoffTotalTime > HANDOFF_TIME_THRESHOLD) { @@ -386,10 +381,10 @@ public void onSuccess(Object result) } resultFuture.set( new SegmentsAndCommitMetadata( - segments, + segmentsAndCommitMetadata.getSegments(), ((AppenderatorDriverMetadata) metadata).getCallerMetadata(), segmentsAndCommitMetadata.getSegmentSchemaMapping(), - upgradedSegments + segmentsAndCommitMetadata.getUpgradedSegments() ) ); } From aad03c905426c1a57c1bf0813b308eb5b78f4bdb Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 25 Apr 2024 13:09:00 +0530 Subject: [PATCH 15/17] More comments --- .../druid/segment/realtime/appenderator/StreamAppenderator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 173d9a5dd1a6..6ba26c7d1789 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -1467,6 +1467,8 @@ private ListenableFuture abandonSegment( if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) { Set relevantSegments = new HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier)); relevantSegments.removeAll(abandonedSegments); + // If there are unabandoned segments associated with the sink, return early + // This may be the case if segments have been upgraded as the result of a concurrent replace if (!relevantSegments.isEmpty()) { return Futures.immediateFuture(null); } From c72ad00458eb2da224ff324268833952aff9892b Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 25 Apr 2024 15:34:39 +0530 Subject: [PATCH 16/17] Synchronize within method --- .../appenderator/StreamAppenderator.java | 54 +++++++++---------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 6ba26c7d1789..8459e159b301 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -548,7 +548,6 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier) } addSink(identifier, retVal); - sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal)); } return retVal; @@ -1077,9 +1076,7 @@ public void closeNow() log.debug("Shutting down immediately..."); for (Map.Entry entry : sinks.entrySet()) { - synchronized (entry.getValue()) { - unannounceAllVersionsOfSegment(entry.getValue().getSegment()); - } + unannounceAllVersionsOfSegment(entry.getValue().getSegment(), entry.getValue()); } try { shutdownExecutors(); @@ -1112,29 +1109,31 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer() /** * Unannounces the given base segment and all its upgraded versions. */ - private void unannounceAllVersionsOfSegment(DataSegment baseSegment) + private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink) { - final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment); - if (!baseSegmentToUpgradedSegments.containsKey(baseId)) { - return; - } + synchronized (sink) { + final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment); + if (!baseSegmentToUpgradedSegments.containsKey(baseId)) { + return; + } - final List upgradedVersionsOfSegment - = ImmutableList.copyOf(baseSegmentToUpgradedSegments.get(baseId)); - - for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) { - final DataSegment newSegment = new DataSegment( - newId.getDataSource(), - newId.getInterval(), - newId.getVersion(), - baseSegment.getLoadSpec(), - baseSegment.getDimensions(), - baseSegment.getMetrics(), - newId.getShardSpec(), - baseSegment.getBinaryVersion(), - baseSegment.getSize() - ); - unannounceSegment(newSegment); + final List upgradedVersionsOfSegment + = ImmutableList.copyOf(baseSegmentToUpgradedSegments.get(baseId)); + + for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) { + final DataSegment newSegment = new DataSegment( + newId.getDataSource(), + newId.getInterval(), + newId.getVersion(), + baseSegment.getLoadSpec(), + baseSegment.getDimensions(), + baseSegment.getMetrics(), + newId.getShardSpec(), + baseSegment.getBinaryVersion(), + baseSegment.getSize() + ); + unannounceSegment(newSegment); + } } } @@ -1530,10 +1529,7 @@ public Void apply(@Nullable Object input) } } - // Unannounce the segment. - synchronized (sink) { - unannounceAllVersionsOfSegment(sink.getSegment()); - } + unannounceAllVersionsOfSegment(sink.getSegment(), sink); Runnable removeRunnable = () -> { droppingSinks.remove(baseIdentifier); From 5d911a77ee204672be23b48551047256fc92f692 Mon Sep 17 00:00:00 2001 From: Amatya Date: Thu, 25 Apr 2024 18:07:35 +0530 Subject: [PATCH 17/17] Simplify logic --- .../SeekableStreamIndexTaskRunner.java | 2 +- .../appenderator/SinkQuerySegmentWalker.java | 6 ++-- .../appenderator/StreamAppenderator.java | 28 ++++++------------- 3 files changed, 12 insertions(+), 24 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 2e02efdeb19e..94ce367fc847 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -1584,7 +1584,7 @@ public Response registerUpgradedPendingSegment( { authorizationCheck(req, Action.WRITE); try { - ((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment(upgradedPendingSegment); + ((StreamAppenderator) appenderator).registerUpgradedPendingSegment(upgradedPendingSegment); return Response.ok().build(); } catch (DruidException e) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index d02e200cfcbd..aba071de1dfc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -356,13 +356,13 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final } } - public void registerNewVersionOfPendingSegment( + public void registerUpgradedPendingSegment( SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion + SegmentIdWithShardSpec upgradedPendingSegment ) { newIdToBasePendingSegment.put( - newSegmentVersion.asSegmentId().toDescriptor(), + upgradedPendingSegment.asSegmentId().toDescriptor(), basePendingSegment.asSegmentId().toDescriptor() ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 8459e159b301..1c5dd42dd770 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -177,7 +177,6 @@ public class StreamAppenderator implements Appenderator = new ConcurrentHashMap<>(); /** * Map from the id of an upgraded pending segment to the segment corresponding to its upgradedFromSegmentId. - * The set contains the base segment itself and its upgraded versions announced as a result of a concurrent replace. */ private final ConcurrentMap upgradedSegmentToBaseSegment = new ConcurrentHashMap<>(); @@ -1117,9 +1116,7 @@ private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink) return; } - final List upgradedVersionsOfSegment - = ImmutableList.copyOf(baseSegmentToUpgradedSegments.get(baseId)); - + final Set upgradedVersionsOfSegment = baseSegmentToUpgradedSegments.remove(baseId); for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) { final DataSegment newSegment = new DataSegment( newId.getDataSource(), @@ -1133,6 +1130,7 @@ private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink) baseSegment.getSize() ); unannounceSegment(newSegment); + upgradedSegmentToBaseSegment.remove(newId); } } } @@ -1147,35 +1145,26 @@ private void unannounceSegment(DataSegment segment) .addData("identifier", segment.getId().toString()) .emit(); } - SegmentIdWithShardSpec id = SegmentIdWithShardSpec.fromDataSegment(segment); - SegmentIdWithShardSpec baseId = upgradedSegmentToBaseSegment.remove(id); - if (baseId == null) { - return; - } - baseSegmentToUpgradedSegments.get(baseId).remove(id); - if (baseSegmentToUpgradedSegments.get(baseId).isEmpty()) { - baseSegmentToUpgradedSegments.remove(baseId); - } } - public void registerNewVersionOfPendingSegment(PendingSegmentRecord pendingSegmentRecord) throws IOException + public void registerUpgradedPendingSegment(PendingSegmentRecord pendingSegmentRecord) throws IOException { SegmentIdWithShardSpec basePendingSegment = idToPendingSegment.get(pendingSegmentRecord.getUpgradedFromSegmentId()); - SegmentIdWithShardSpec newSegmentVersion = pendingSegmentRecord.getId(); + SegmentIdWithShardSpec upgradedPendingSegment = pendingSegmentRecord.getId(); if (!sinks.containsKey(basePendingSegment) || droppingSinks.contains(basePendingSegment)) { return; } // Update query mapping with SinkQuerySegmentWalker - ((SinkQuerySegmentWalker) texasRanger).registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion); + ((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(basePendingSegment, upgradedPendingSegment); // Announce segments final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment(); - final DataSegment newSegment = getUpgradedSegment(baseSegment, newSegmentVersion); + final DataSegment newSegment = getUpgradedSegment(baseSegment, upgradedPendingSegment); segmentAnnouncer.announceSegment(newSegment); - baseSegmentToUpgradedSegments.get(basePendingSegment).add(newSegmentVersion); - upgradedSegmentToBaseSegment.put(newSegmentVersion, basePendingSegment); + baseSegmentToUpgradedSegments.get(basePendingSegment).add(upgradedPendingSegment); + upgradedSegmentToBaseSegment.put(upgradedPendingSegment, basePendingSegment); } private DataSegment getUpgradedSegment(DataSegment baseSegment, SegmentIdWithShardSpec upgradedVersion) @@ -1445,7 +1434,6 @@ private void addSink(SegmentIdWithShardSpec identifier, Sink sink) // The base segment is associated with itself in the maps to maintain all the upgraded ids of a sink. baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); baseSegmentToUpgradedSegments.get(identifier).add(identifier); - upgradedSegmentToBaseSegment.put(identifier, identifier); sinkTimeline.add( sink.getInterval(),