From 0e80a6051b24fe1bba34685d5d0d88561e41eefb Mon Sep 17 00:00:00 2001 From: Amatya Date: Wed, 31 Jan 2024 16:18:08 +0530 Subject: [PATCH] Concurrent Streaming and Replace improvements --- .../SegmentTransactionalAppendAction.java | 25 +++++++-- .../AppenderatorDriverRealtimeIndexTask.java | 29 +++++----- .../druid/indexing/common/task/IndexTask.java | 3 +- .../parallel/ParallelIndexSupervisorTask.java | 5 +- .../druid/indexing/overlord/TaskLockbox.java | 23 ++++++++ .../seekablestream/SequenceMetadata.java | 6 +- .../seekablestream/SequenceMetadataTest.java | 4 +- ...TestIndexerMetadataStorageCoordinator.java | 3 +- .../IndexerMetadataStorageCoordinator.java | 3 +- .../IndexerSQLMetadataStorageCoordinator.java | 56 +++++++++++++++++-- .../appenderator/BaseAppenderatorDriver.java | 10 +++- .../SegmentsAndCommitMetadata.java | 30 ++++++++-- .../appenderator/StreamAppenderator.java | 28 +++++++++- .../StreamAppenderatorDriver.java | 27 ++++++--- .../TransactionalSegmentPublisher.java | 10 +++- ...mentsSinksBatchAppenderatorDriverTest.java | 3 +- ...edSegmentsBatchAppenderatorDriverTest.java | 3 +- .../StreamAppenderatorDriverTest.java | 4 +- 18 files changed, 218 insertions(+), 54 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index 67b701718cae..bf455cd96080 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -52,30 +53,36 @@ public class SegmentTransactionalAppendAction implements TaskAction> segmentToUpgradedVersions; + public static SegmentTransactionalAppendAction forSegments(Set segments) { - return new SegmentTransactionalAppendAction(segments, null, null); + return new SegmentTransactionalAppendAction(segments, null, null, null); } public static SegmentTransactionalAppendAction forSegmentsAndMetadata( Set segments, DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + DataSourceMetadata endMetadata, + Map> segmentToUpgradedVersions ) { - return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata); + return new SegmentTransactionalAppendAction(segments, startMetadata, endMetadata, segmentToUpgradedVersions); } @JsonCreator private SegmentTransactionalAppendAction( @JsonProperty("segments") Set segments, @JsonProperty("startMetadata") @Nullable DataSourceMetadata startMetadata, - @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata + @JsonProperty("endMetadata") @Nullable DataSourceMetadata endMetadata, + @JsonProperty("segmentToUpgradedVersions") @Nullable Map> segmentToUpgradedVersions ) { this.segments = segments; this.startMetadata = startMetadata; this.endMetadata = endMetadata; + this.segmentToUpgradedVersions = segmentToUpgradedVersions; if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) { @@ -103,6 +110,13 @@ public DataSourceMetadata getEndMetadata() return endMetadata; } + @JsonProperty + @Nullable + public Map> getSegmentToUpgradedVersions() + { + return segmentToUpgradedVersions; + } + @Override public TypeReference getReturnTypeReference() { @@ -142,7 +156,8 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) segments, segmentToReplaceLock, startMetadata, - endMetadata + endMetadata, + segmentToUpgradedVersions ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 9cee79b63086..72f95d08d141 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -351,20 +351,21 @@ public TaskStatus runTask(final TaskToolbox toolbox) int sequenceNumber = 0; String sequenceName = makeSequenceName(getId(), sequenceNumber); - final TransactionalSegmentPublisher publisher = (mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata) -> { - if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) { - throw new ISE( - "Stream ingestion task unexpectedly attempted to overwrite segments: %s", - SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments) - ); - } - final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction( - segments, - null, - null - ); - return toolbox.getTaskActionClient().submit(action); - }; + final TransactionalSegmentPublisher publisher = + (mustBeNullOrEmptyOverwriteSegments, segments, commitMetadata, segmentIdToUpgradedVersions) -> { + if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) { + throw new ISE( + "Stream ingestion task unexpectedly attempted to overwrite segments: %s", + SegmentUtils.commaSeparatedIdentifiers(mustBeNullOrEmptyOverwriteSegments) + ); + } + final SegmentTransactionalInsertAction action = SegmentTransactionalInsertAction.appendAction( + segments, + null, + null + ); + return toolbox.getTaskActionClient().submit(action); + }; // Skip connecting firehose if we've been stopped before we got started. synchronized (this) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index b18a6f0f1748..b2469388cfcc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -918,7 +918,8 @@ private TaskStatus generateAndPublishSegments( final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse(); final TransactionalSegmentPublisher publisher = - (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit( + (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentIdToUpgradedVersions) + -> toolbox.getTaskActionClient().submit( buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType) ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 9940dfdcb4de..7b8984af1720 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1170,13 +1170,14 @@ private void publishSegments( final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse(); final TransactionalSegmentPublisher publisher = - (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> toolbox.getTaskActionClient().submit( + (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentIdToUpgradedVersions) + -> toolbox.getTaskActionClient().submit( buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, taskLockType) ); final boolean published = newSegments.isEmpty() - || publisher.publishSegments(oldSegments, newSegments, annotateFunction, null).isSuccess(); + || publisher.publishSegments(oldSegments, newSegments, annotateFunction, null, null).isSuccess(); if (published) { LOG.info("Published [%d] segments", newSegments.size()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 54e29191cff7..3a2c4538a479 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -514,6 +514,29 @@ private void acquireTaskLock(SegmentAllocationHolder holder, boolean isTimeChunk } } + /** + * Conflicting lock - A lock for the same datasource and overlapping interval as the request. + * Reusable lock - A conflicting lock which satisifies the lock request. + * Superseding lock - A lock created for the request which may make at least one of the existing locks unneeded + * Flow: + * Check for conflicting locks. + * If there are none, create a new lock posse + * Else, find a unique lock posse with a lock that can be reused for the request + * Else, check if the conflicting locks can coexist and create a new lock posse + * Else, check if the incompatible locks can be revoked and create a new lock posse, and revoke such locks + * + * If none of the conditions is true, return null + * Else, add the task to the newly created lock posse. + * If the lock task was added to the posse for the first time: + * A) Check if the newly created posse can replace any of the existing locks. If yes, unlock such superseded locks. + *. B) Commit the lock and task to druid_tasklocks. (Unlock if the commit to metadata store fails) + * + * @param request Request for lock. + * @param task Task requesting the lock. + * @param persist If true, commit the newly created lock to the metadata store. + * Should be false when syncing from storage. + * @return A TaskLockPosse associating the lock for the request with the provided task. + */ private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, boolean persist) { Preconditions.checkState(!(request instanceof LockRequestForNewSegment), "Can't handle LockRequestForNewSegment"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index b5a65e99462c..f7c7d5478019 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.segment.SegmentUtils; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.timeline.DataSegment; @@ -351,7 +352,8 @@ public SequenceMetadataTransactionalSegmentPublisher( public SegmentPublishResult publishAnnotatedSegments( @Nullable Set mustBeNullOrEmptyOverwriteSegments, Set segmentsToPush, - @Nullable Object commitMetadata + @Nullable Object commitMetadata, + @Nullable Map> segmentIdToUpgradeVersions ) throws IOException { if (mustBeNullOrEmptyOverwriteSegments != null && !mustBeNullOrEmptyOverwriteSegments.isEmpty()) { @@ -417,7 +419,7 @@ public SegmentPublishResult publishAnnotatedSegments( ); final DataSourceMetadata endMetadata = runner.createDataSourceMetadata(finalPartitions); action = taskLockType == TaskLockType.APPEND - ? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata) + ? SegmentTransactionalAppendAction.forSegmentsAndMetadata(segmentsToPush, startMetadata, endMetadata, segmentIdToUpgradeVersions) : SegmentTransactionalInsertAction.appendAction(segmentsToPush, startMetadata, endMetadata); } else { action = taskLockType == TaskLockType.APPEND diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java index fbe63ffe2689..2d016be55924 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SequenceMetadataTest.java @@ -80,7 +80,7 @@ public void testPublishAnnotatedSegmentsThrowExceptionIfOverwriteSegmentsNotNull ISE exception = Assert.assertThrows( ISE.class, - () -> transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null) + () -> transactionalSegmentPublisher.publishAnnotatedSegments(notNullNotEmptySegment, ImmutableSet.of(), null, null) ); Assert.assertEquals( "Stream ingestion task unexpectedly attempted to overwrite segments: " @@ -115,6 +115,6 @@ public void testPublishAnnotatedSegmentsSucceedIfDropSegmentsAndOverwriteSegment ); TransactionalSegmentPublisher transactionalSegmentPublisher = sequenceMetadata.createPublisher(mockSeekableStreamIndexTaskRunner, mockTaskToolbox, false); - transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableMap.of()); + transactionalSegmentPublisher.publishAnnotatedSegments(null, notNullNotEmptySegment, ImmutableMap.of(), null); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index f42a300de5f3..ab7c89b057f3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -174,7 +174,8 @@ public SegmentPublishResult commitAppendSegmentsAndMetadata( Set appendSegments, Map appendSegmentToReplaceLock, DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + DataSourceMetadata endMetadata, + Map> segmentIdToUpgradedVersions ) { return SegmentPublishResult.ok(commitSegments(appendSegments)); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 31c975339007..29857843bda2 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -312,7 +312,8 @@ SegmentPublishResult commitAppendSegmentsAndMetadata( Set appendSegments, Map appendSegmentToReplaceLock, DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + DataSourceMetadata endMetadata, + Map> segmentToUpgradeVersions ); /** diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 0ef488aed405..17c6e0d6bb59 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -62,6 +62,7 @@ import org.apache.druid.timeline.partition.PartitionIds; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.SingleDimensionShardSpec; +import org.apache.druid.utils.CollectionUtils; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.chrono.ISOChronology; @@ -536,6 +537,7 @@ public SegmentPublishResult commitAppendSegments( appendSegments, appendSegmentToReplaceLock, null, + null, null ); } @@ -545,14 +547,16 @@ public SegmentPublishResult commitAppendSegmentsAndMetadata( Set appendSegments, Map appendSegmentToReplaceLock, DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + DataSourceMetadata endMetadata, + Map> segmentIdToUpgradedVersions ) { return commitAppendSegmentsAndMetadataInTransaction( appendSegments, appendSegmentToReplaceLock, startMetadata, - endMetadata + endMetadata, + segmentIdToUpgradedVersions ); } @@ -1291,7 +1295,8 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( Set appendSegments, Map appendSegmentToReplaceLock, @Nullable DataSourceMetadata startMetadata, - @Nullable DataSourceMetadata endMetadata + @Nullable DataSourceMetadata endMetadata, + @Nullable Map> segmentIdToUpgradedVersions ) { verifySegmentsToCommit(appendSegments); @@ -1303,7 +1308,7 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( final String dataSource = appendSegments.iterator().next().getDataSource(); final Set segmentIdsForNewVersions = connector.retryTransaction( (handle, transactionStatus) - -> createNewIdsForAppendSegments(handle, dataSource, appendSegments), + -> createNewIdsForAppendSegments(handle, dataSource, appendSegments, segmentIdToUpgradedVersions), 0, SQLMetadataConnector.DEFAULT_MAX_TRIES ); @@ -1436,13 +1441,18 @@ private void insertPendingSegmentIntoMetastore( private Set createNewIdsForAppendSegments( Handle handle, String dataSource, - Set segmentsToAppend + Set segmentsToAppend, + @Nullable Map> segmentIdToUpgradeVersions ) throws IOException { if (segmentsToAppend.isEmpty()) { return Collections.emptySet(); } + if (segmentIdToUpgradeVersions == null) { + segmentIdToUpgradeVersions = Collections.emptyMap(); + } + final Set appendIntervals = new HashSet<>(); final TreeMap> appendVersionToSegments = new TreeMap<>(); for (DataSegment segment : segmentsToAppend) { @@ -1475,7 +1485,37 @@ private Set createNewIdsForAppendSegments( entry.getValue(), appendVersionToSegments ); - for (Map.Entry> upgradeEntry : segmentsToUpgrade.entrySet()) { + Map> filteredSegmentsToUpgrade = new HashMap<>(); + for (Interval interval : segmentsToUpgrade.keySet()) { + filteredSegmentsToUpgrade.put(interval, new HashSet<>()); + for (DataSegment segment : segmentsToUpgrade.get(interval)) { + boolean include = true; + if (segmentIdToUpgradeVersions.containsKey(segment.getId().toString())) { + for (SegmentIdWithShardSpec upgradedSegment : segmentIdToUpgradeVersions.get(segment.getId().toString())) { + if (upgradedSegment.getVersion().equals(upgradeVersion)) { + upgradedSegments.add( + new DataSegment( + upgradedSegment.asSegmentId(), + segment.getLoadSpec(), + segment.getDimensions(), + segment.getMetrics(), + upgradedSegment.getShardSpec(), + null, + segment.getBinaryVersion(), + segment.getSize() + ) + ); + include = false; + break; + } + } + } + if (include) { + filteredSegmentsToUpgrade.get(interval).add(segment); + } + } + } + for (Map.Entry> upgradeEntry : filteredSegmentsToUpgrade.entrySet()) { final Interval upgradeInterval = upgradeEntry.getKey(); final Set segmentsAlreadyOnVersion = overlappingIntervalToSegments.getOrDefault(upgradeInterval, Collections.emptySet()) @@ -1548,6 +1588,10 @@ private Set createNewIdsForAppendSegmentsWithVersion( Set committedSegments ) throws IOException { + if (CollectionUtils.isNullOrEmpty(segmentsToUpgrade)) { + return Collections.emptySet(); + } + // Find the committed segments with the higest partition number SegmentIdWithShardSpec committedMaxId = null; for (DataSegment committedSegment : committedSegments) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index e27d3ff68b69..3afb83b05f97 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -614,6 +614,13 @@ ListenableFuture publishInBackground( final Object callerMetadata = metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(); + final Map> segmentIdToUpgradedVersions; + if (appenderator instanceof StreamAppenderator) { + segmentIdToUpgradedVersions = ((StreamAppenderator) appenderator).getBaseSegmentToUpgradedVersions(); + } else { + segmentIdToUpgradedVersions = null; + } + return executor.submit( () -> { try { @@ -625,7 +632,8 @@ ListenableFuture publishInBackground( segmentsToBeOverwritten, ourSegments, outputSegmentsAnnotateFunction, - callerMetadata + callerMetadata, + segmentIdToUpgradedVersions ); if (publishResult.isSuccess()) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java index 3d30e017ca5c..d37069e8be0e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java @@ -30,26 +30,46 @@ public class SegmentsAndCommitMetadata { - private static final SegmentsAndCommitMetadata NIL = new SegmentsAndCommitMetadata(Collections.emptyList(), null); + private static final SegmentsAndCommitMetadata NIL = new SegmentsAndCommitMetadata(Collections.emptyList(), null, null); private final Object commitMetadata; private final ImmutableList segments; + private final ImmutableList upgradedSegments; public SegmentsAndCommitMetadata( List segments, - @Nullable Object commitMetadata + @Nullable Object commitMetadata, + @Nullable List upgradedSegments ) { this.segments = ImmutableList.copyOf(segments); + if (upgradedSegments == null) { + this.upgradedSegments = ImmutableList.of(); + } else { + this.upgradedSegments = ImmutableList.copyOf(upgradedSegments); + } this.commitMetadata = commitMetadata; } + public SegmentsAndCommitMetadata( + List segments, + @Nullable Object commitMetadata + ) + { + this(segments, commitMetadata, null); + } + @Nullable public Object getCommitMetadata() { return commitMetadata; } + public List getUpgradedSegments() + { + return upgradedSegments; + } + public List getSegments() { return segments; @@ -66,13 +86,14 @@ public boolean equals(Object o) } SegmentsAndCommitMetadata that = (SegmentsAndCommitMetadata) o; return Objects.equals(commitMetadata, that.commitMetadata) && - Objects.equals(segments, that.segments); + Objects.equals(segments, that.segments) && + Objects.equals(upgradedSegments, that.upgradedSegments); } @Override public int hashCode() { - return Objects.hash(commitMetadata, segments); + return Objects.hash(commitMetadata, segments, upgradedSegments); } @Override @@ -81,6 +102,7 @@ public String toString() return getClass().getSimpleName() + "{" + "commitMetadata=" + commitMetadata + ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + + ", upgradedSegments=" + SegmentUtils.commaSeparatedIdentifiers(upgradedSegments) + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 4ebe492ca234..ccde61cb3c0a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -769,6 +769,7 @@ public ListenableFuture push( persistAll(committer), (Function) commitMetadata -> { final List dataSegments = new ArrayList<>(); + final List upgradedSegments = new ArrayList<>(); log.info("Preparing to push (stats): processed rows: [%d], sinks: [%d], fireHydrants (across sinks): [%d]", rowIngestionMeters.getProcessed(), theSinks.size(), pushedHydrantsCount.get() @@ -792,6 +793,22 @@ public ListenableFuture push( ); if (dataSegment != null) { dataSegments.add(dataSegment); + if (baseSegmentToUpgradedVersions.containsKey(dataSegment.getId())) { + for (SegmentIdWithShardSpec upgradedSegment : baseSegmentToUpgradedVersions.get(dataSegment.getId())) { + upgradedSegments.add( + new DataSegment( + upgradedSegment.asSegmentId(), + dataSegment.getLoadSpec(), + dataSegment.getDimensions(), + dataSegment.getMetrics(), + upgradedSegment.getShardSpec(), + null, + dataSegment.getBinaryVersion(), + dataSegment.getSize() + ) + ); + } + } } else { log.warn("mergeAndPush[%s] returned null, skipping.", entry.getKey()); } @@ -799,7 +816,7 @@ public ListenableFuture push( log.info("Push complete..."); - return new SegmentsAndCommitMetadata(dataSegments, commitMetadata); + return new SegmentsAndCommitMetadata(dataSegments, commitMetadata, upgradedSegments); }, pushExecutor ); @@ -1157,6 +1174,15 @@ private void lockBasePersistDirectory() } } + public Map> getBaseSegmentToUpgradedVersions() + { + final Map> retVal = new HashMap<>(); + for (SegmentId id : baseSegmentToUpgradedVersions.keySet()) { + retVal.put(id.toString(), baseSegmentToUpgradedVersions.get(id)); + } + return retVal; + } + private void unlockBasePersistDirectory() { try { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index b2b41bf44f76..f2a21226f990 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -322,10 +323,14 @@ public ListenableFuture registerHandoff(SegmentsAndCo return Futures.immediateFuture(null); } else { - final List waitingSegmentIdList = segmentsAndCommitMetadata.getSegments().stream() - .map( - SegmentIdWithShardSpec::fromDataSegment) - .collect(Collectors.toList()); + final List waitingSegmentList = new ArrayList<>(segmentsAndCommitMetadata.getSegments()); + if (segmentsAndCommitMetadata.getUpgradedSegments() != null) { + waitingSegmentList.addAll(segmentsAndCommitMetadata.getUpgradedSegments()); + } + final List waitingSegmentIdList = waitingSegmentList.stream() + .map( + SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toList()); final Object metadata = Preconditions.checkNotNull(segmentsAndCommitMetadata.getCommitMetadata(), "commitMetadata"); if (waitingSegmentIdList.isEmpty()) { @@ -365,17 +370,25 @@ public void onSuccess(Object result) { if (numRemainingHandoffSegments.decrementAndGet() == 0) { List segments = segmentsAndCommitMetadata.getSegments(); - log.info("Successfully handed off [%d] segments.", segments.size()); + final List upgradedSegments; + if (segmentsAndCommitMetadata.getUpgradedSegments() == null) { + upgradedSegments = ImmutableList.of(); + } else { + upgradedSegments = segmentsAndCommitMetadata.getUpgradedSegments(); + } + log.info("Successfully handed off [%d] segments and [%d] upgraded segments.", + segments.size(), upgradedSegments.size()); final long handoffTotalTime = System.currentTimeMillis() - handoffStartTime; metrics.reportMaxSegmentHandoffTime(handoffTotalTime); if (handoffTotalTime > HANDOFF_TIME_THRESHOLD) { log.warn("Slow segment handoff! Time taken for [%d] segments is %d ms", - segments.size(), handoffTotalTime); + segments.size() + upgradedSegments.size(), handoffTotalTime); } resultFuture.set( new SegmentsAndCommitMetadata( segments, - ((AppenderatorDriverMetadata) metadata).getCallerMetadata() + ((AppenderatorDriverMetadata) metadata).getCallerMetadata(), + upgradedSegments ) ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index 2ffb4dd572a3..901ce8cd5a9c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.Map; import java.util.Set; import java.util.function.Function; @@ -42,14 +43,16 @@ public interface TransactionalSegmentPublisher SegmentPublishResult publishAnnotatedSegments( @Nullable Set segmentsToBeOverwritten, Set segmentsToPublish, - @Nullable Object commitMetadata + @Nullable Object commitMetadata, + @Nullable Map> segmentIdToUpgradeVersions ) throws IOException; default SegmentPublishResult publishSegments( @Nullable Set segmentsToBeOverwritten, Set segmentsToPublish, Function, Set> outputSegmentsAnnotateFunction, - @Nullable Object commitMetadata + @Nullable Object commitMetadata, + @Nullable Map> segmentIdToUpgradeVersions ) throws IOException { final Function, Set> annotateFunction = outputSegmentsAnnotateFunction @@ -57,7 +60,8 @@ default SegmentPublishResult publishSegments( return publishAnnotatedSegments( segmentsToBeOverwritten, annotateFunction.apply(segmentsToPublish), - commitMetadata + commitMetadata, + segmentIdToUpgradeVersions ); } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java index 0dcb987c59ba..1dc06607ffd4 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmentsSinksBatchAppenderatorDriverTest.java @@ -204,7 +204,8 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentIdToUpgradedVersions) + -> SegmentPublishResult.ok(ImmutableSet.of()); } static class TestSegmentAllocator implements SegmentAllocator diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java index 9657521a5261..f930e00e0fa4 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsBatchAppenderatorDriverTest.java @@ -200,6 +200,7 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentIdToUpgradedVersions) + -> SegmentPublishResult.ok(ImmutableSet.of()); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 8f2b77c4fc0f..75656df4e988 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -375,13 +375,13 @@ private Set asIdentifiers(Iterable segments static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentIdToUpgradedVersions) -> SegmentPublishResult.ok(Collections.emptySet()); } static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata) -> { + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentIdToUpgradedVersions) -> { final RuntimeException exception = new RuntimeException("test"); if (failWithException) { throw exception;