diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java index 64cecdfbf255..7bb57c5e1c25 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQControllerTask.java @@ -40,6 +40,7 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -71,7 +72,7 @@ import java.util.Set; @JsonTypeName(MSQControllerTask.TYPE) -public class MSQControllerTask extends AbstractTask implements ClientTaskQuery +public class MSQControllerTask extends AbstractTask implements ClientTaskQuery, PendingSegmentAllocatingTask { public static final String TYPE = "query_controller"; public static final String DUMMY_DATASOURCE_FOR_SELECT = "__query_select"; @@ -157,6 +158,12 @@ public Set getInputSourceResources() return ImmutableSet.of(); } + @Override + public String getTaskAllocatorId() + { + return getId(); + } + @JsonProperty("spec") public MSQSpec getQuerySpec() { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index 95abc6b03a63..c04948402079 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.exec.Worker; @@ -45,7 +46,7 @@ import java.util.Set; @JsonTypeName(MSQWorkerTask.TYPE) -public class MSQWorkerTask extends AbstractTask +public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask { public static final String TYPE = "query_worker"; @@ -125,6 +126,12 @@ public Set getInputSourceResources() return ImmutableSet.of(); } + @Override + public String getTaskAllocatorId() + { + return getControllerTaskId(); + } + @Override public boolean isReady(final TaskActionClient taskActionClient) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java index 6aaf21ee3bde..309eb3830ac5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQControllerTaskTest.java @@ -35,7 +35,7 @@ public class MSQControllerTaskTest { - MSQSpec MSQ_SPEC = MSQSpec + private final MSQSpec MSQ_SPEC = MSQSpec .builder() .destination(new DataSourceMSQDestination( "target", @@ -59,7 +59,7 @@ public class MSQControllerTaskTest @Test public void testGetInputSourceResources() { - MSQControllerTask msqWorkerTask = new MSQControllerTask( + MSQControllerTask controllerTask = new MSQControllerTask( null, MSQ_SPEC, null, @@ -67,7 +67,25 @@ public void testGetInputSourceResources() null, null, null, - null); - Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty()); + null + ); + Assert.assertTrue(controllerTask.getInputSourceResources().isEmpty()); + } + + @Test + public void testGetTaskAllocatorId() + { + final String taskId = "taskId"; + MSQControllerTask controllerTask = new MSQControllerTask( + taskId, + MSQ_SPEC, + null, + null, + null, + null, + null, + null + ); + Assert.assertEquals(taskId, controllerTask.getTaskAllocatorId()); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 37c31ba3271f..482d67d81abe 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -47,7 +47,6 @@ public class MSQWorkerTaskTest @Test public void testEquals() { - Assert.assertNotEquals(msqWorkerTask, 0); Assert.assertEquals(msqWorkerTask, msqWorkerTask); Assert.assertEquals( msqWorkerTask, @@ -110,4 +109,11 @@ public void testGetInputSourceResources() Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty()); } + @Test + public void testGetTaskAllocatorId() + { + MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); + Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId()); + } + } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index 280f4199e7b8..d0308516e04b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -23,8 +23,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.LockRequestForNewSegment; @@ -210,6 +212,12 @@ public SegmentIdWithShardSpec perform( final TaskActionToolbox toolbox ) { + if (!(task instanceof PendingSegmentAllocatingTask)) { + throw DruidException.defensive( + "Task[%s] of type[%s] cannot allocate segments as it does not implement PendingSegmentAllocatingTask.", + task.getId(), task.getType() + ); + } int attempt = 0; while (true) { attempt++; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index 67b701718cae..1a1e6c793776 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -22,10 +22,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.DataSourceMetadata; @@ -41,8 +43,20 @@ import java.util.stream.Collectors; /** + * * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by * your task for the segment intervals. + * + *
+ * Pseudo code (for a single interval):
+ * For an append lock held over an interval:
+ *     transaction {
+ *       commit input segments contained within interval
+ *       if there is an active replace lock over the interval:
+ *         add an entry for the inputSegment corresponding to the replace lock's task in the upgradeSegments table
+ *       fetch pending segments with parent contained within the input segments, and commit them
+ *     }
+ * 
*/ public class SegmentTransactionalAppendAction implements TaskAction { @@ -114,6 +128,13 @@ public TypeReference getReturnTypeReference() @Override public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) { + if (!(task instanceof PendingSegmentAllocatingTask)) { + throw DruidException.defensive( + "Task[%s] of type[%s] cannot append segments as it does not implement PendingSegmentAllocatingTask.", + task.getId(), + task.getType() + ); + } // Verify that all the locks are of expected type final List locks = toolbox.getTaskLockbox().findLocksForTask(task); for (TaskLock lock : locks) { @@ -132,17 +153,20 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) = TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments); final CriticalAction.Action publishAction; + final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId(); if (startMetadata == null) { publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments( segments, - segmentToReplaceLock + segmentToReplaceLock, + taskAllocatorId ); } else { publishAction = () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegmentsAndMetadata( segments, segmentToReplaceLock, startMetadata, - endMetadata + endMetadata, + taskAllocatorId ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index aaa62db90a7c..2f4a580e0464 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -30,11 +30,14 @@ import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -42,6 +45,20 @@ /** * Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by * your task for the segment intervals. + * + *
+ *  Pseudo code (for a single interval)
+ *- For a replace lock held over an interval:
+ *     transaction {
+ *       commit input segments contained within interval
+ *       upgrade ids in the upgradeSegments table corresponding to this task to the replace lock's version and commit them
+ *       fetch payload, task_allocator_id for pending segments
+ *       upgrade each such pending segment to the replace lock's version with the corresponding root segment
+ *     }
+ * For every pending segment with version == replace lock version:
+ *    Fetch payload, group_id or the pending segment and relay them to the supervisor
+ *    The supervisor relays the payloads to all the tasks with the corresponding group_id to serve realtime queries
+ * 
*/ public class SegmentTransactionalReplaceAction implements TaskAction { @@ -123,7 +140,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // failure to upgrade pending segments does not affect success of the commit if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) { try { - tryUpgradeOverlappingPendingSegments(task, toolbox); + registerUpgradedPendingSegmentsOnSupervisor(task, toolbox); } catch (Exception e) { log.error(e, "Error while upgrading pending segments for task[%s]", task.getId()); @@ -134,34 +151,55 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) } /** - * Tries to upgrade any pending segments that overlap with the committed segments. + * Registers upgraded pending segments on the active supervisor, if any */ - private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox) + private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox) { final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); final Optional activeSupervisorIdWithAppendLock = supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); + if (!activeSupervisorIdWithAppendLock.isPresent()) { return; } - final Set activeRealtimeSequencePrefixes - = supervisorManager.getActiveRealtimeSequencePrefixes(activeSupervisorIdWithAppendLock.get()); - Map upgradedPendingSegments = - toolbox.getIndexerMetadataStorageCoordinator() - .upgradePendingSegmentsOverlappingWith(segments, activeRealtimeSequencePrefixes); - log.info( - "Upgraded [%d] pending segments for REPLACE task[%s]: [%s]", - upgradedPendingSegments.size(), task.getId(), upgradedPendingSegments - ); + final Set replaceLocksForTask = toolbox + .getTaskLockbox() + .getAllReplaceLocksForDatasource(task.getDataSource()) + .stream() + .filter(lock -> task.getId().equals(lock.getSupervisorTaskId())) + .collect(Collectors.toSet()); + - upgradedPendingSegments.forEach( - (oldId, newId) -> toolbox.getSupervisorManager() - .registerNewVersionOfPendingSegmentOnSupervisor( - activeSupervisorIdWithAppendLock.get(), - oldId, - newId - ) + Set pendingSegments = new HashSet<>(); + for (ReplaceTaskLock replaceLock : replaceLocksForTask) { + pendingSegments.addAll( + toolbox.getIndexerMetadataStorageCoordinator() + .getPendingSegments(task.getDataSource(), replaceLock.getInterval()) + ); + } + Map idToPendingSegment = new HashMap<>(); + pendingSegments.forEach(pendingSegment -> idToPendingSegment.put( + pendingSegment.getId().asSegmentId().toString(), + pendingSegment.getId() + )); + Map segmentToParent = new HashMap<>(); + pendingSegments.forEach(pendingSegment -> { + if (pendingSegment.getUpgradedFromSegmentId() != null + && !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) { + segmentToParent.put( + pendingSegment.getId(), + idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId()) + ); + } + }); + + segmentToParent.forEach( + (newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( + activeSupervisorIdWithAppendLock.get(), + oldId, + newId + ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 1c581cd6d9ca..42759262fab5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -119,7 +119,8 @@ import java.util.concurrent.TimeoutException; @Deprecated -public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements ChatHandler +public class AppenderatorDriverRealtimeIndexTask extends AbstractTask + implements ChatHandler, PendingSegmentAllocatingTask { private static final String CTX_KEY_LOOKUP_TIER = "lookupTier"; @@ -259,6 +260,12 @@ public boolean isReady(TaskActionClient taskActionClient) return true; } + @Override + public String getTaskAllocatorId() + { + return getGroupId(); + } + @Override public TaskStatus runTask(final TaskToolbox toolbox) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index e5df64c5e790..f5aec08c3062 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -127,7 +127,7 @@ * serialization fields of this class must correspond to those of {@link * ClientCompactionTaskQuery}. */ -public class CompactionTask extends AbstractBatchIndexTask +public class CompactionTask extends AbstractBatchIndexTask implements PendingSegmentAllocatingTask { private static final Logger log = new Logger(CompactionTask.class); private static final Clock UTC_CLOCK = Clock.systemUTC(); @@ -400,6 +400,12 @@ public String getType() return TYPE; } + @Override + public String getTaskAllocatorId() + { + return getGroupId(); + } + @Nonnull @JsonIgnore @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 329bc0d87183..1796f6ea2a64 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -137,7 +137,7 @@ import java.util.function.Function; import java.util.stream.Collectors; -public class IndexTask extends AbstractBatchIndexTask implements ChatHandler +public class IndexTask extends AbstractBatchIndexTask implements ChatHandler, PendingSegmentAllocatingTask { public static final HashFunction HASH_FUNCTION = Hashing.murmur3_128(); @@ -302,6 +302,12 @@ public Granularity getSegmentGranularity() } } + @Override + public String getTaskAllocatorId() + { + return getGroupId(); + } + @Nonnull @JsonIgnore @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java index 678d8cafd01a..9d91542ebf0b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java @@ -39,7 +39,7 @@ /** */ -public class NoopTask extends AbstractTask +public class NoopTask extends AbstractTask implements PendingSegmentAllocatingTask { private static final int DEFAULT_RUN_TIME = 2500; @@ -111,6 +111,12 @@ public int getPriority() return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } + @Override + public String getTaskAllocatorId() + { + return getId(); + } + public static NoopTask create() { return forDatasource(null); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java new file mode 100644 index 000000000000..e392adc45535 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/PendingSegmentAllocatingTask.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task; + +/** + * An interface to be implemented by every appending task that allocates pending segments. + */ +public interface PendingSegmentAllocatingTask +{ + /** + * Unique string used by an appending task (or its sub-tasks and replicas) to allocate pending segments + * and identify pending segments allocated to it. + */ + String getTaskAllocatorId(); +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 5d72f4a177c8..935adb3cde0f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -52,6 +52,7 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; @@ -131,7 +132,8 @@ * * @see ParallelIndexTaskRunner */ -public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implements ChatHandler +public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask + implements ChatHandler, PendingSegmentAllocatingTask { public static final String TYPE = "index_parallel"; @@ -476,6 +478,12 @@ public boolean isPerfectRollup() ); } + @Override + public String getTaskAllocatorId() + { + return getGroupId(); + } + @Nullable @Override public Granularity getSegmentGranularity() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 0b4f62e6388c..e02d59936b20 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -43,6 +43,7 @@ import org.apache.druid.indexing.common.task.BatchAppenderators; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch; import org.apache.druid.indexing.common.task.SegmentAllocators; import org.apache.druid.indexing.common.task.TaskResource; @@ -105,7 +106,7 @@ * generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of * publishing on its own. */ -public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler +public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask { public static final String TYPE = "single_phase_sub_task"; public static final String OLD_TYPE_NAME = "index_sub"; @@ -236,6 +237,12 @@ public String getSubtaskSpecId() return subtaskSpecId; } + @Override + public String getTaskAllocatorId() + { + return getGroupId(); + } + @Override public TaskStatus runTask(final TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 54e29191cff7..35ec79d74ec7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.google.errorprone.annotations.concurrent.GuardedBy; import com.google.inject.Inject; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.LockGranularity; @@ -38,6 +39,7 @@ import org.apache.druid.indexing.common.actions.SegmentAllocateAction; import org.apache.druid.indexing.common.actions.SegmentAllocateRequest; import org.apache.druid.indexing.common.actions.SegmentAllocateResult; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.java.util.common.ISE; @@ -99,10 +101,20 @@ public class TaskLockbox private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); - // Stores List of Active Tasks. TaskLockbox will only grant locks to active activeTasks. - // this set should be accessed under the giant lock. + /** + * Set of active tasks. Locks can be granted only to a task present in this set. + * Should be accessed only under the giant lock. + */ private final Set activeTasks = new HashSet<>(); + /** + * Map from a taskAllocatorId to the set of active taskIds using that allocator id. + * Used to clean up pending segments for a taskAllocatorId as soon as the set + * of corresponding active taskIds becomes empty. + */ + @GuardedBy("giant") + private final Map> activeAllocatorIdToTaskIds = new HashMap<>(); + @Inject public TaskLockbox( TaskStorage taskStorage, @@ -213,6 +225,12 @@ public int compare(Pair left, Pair right) activeTasks.remove(task.getId()); } } + activeAllocatorIdToTaskIds.clear(); + for (Task task : storedActiveTasks) { + if (activeTasks.contains(task.getId())) { + trackAppendingTask(task); + } + } log.info( "Synced %,d locks for %,d activeTasks from storage (%,d locks ignored).", @@ -387,7 +405,7 @@ public LockResult tryLock(final Task task, final LockRequest request) if (request instanceof LockRequestForNewSegment) { final LockRequestForNewSegment lockRequestForNewSegment = (LockRequestForNewSegment) request; if (lockRequestForNewSegment.getGranularity() == LockGranularity.SEGMENT) { - newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion()); + newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion(), null); if (newSegmentId == null) { return LockResult.fail(); } @@ -411,7 +429,12 @@ public LockResult tryLock(final Task task, final LockRequest request) newSegmentId ); } - newSegmentId = allocateSegmentId(lockRequestForNewSegment, posseToUse.getTaskLock().getVersion()); + final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId(); + newSegmentId = allocateSegmentId( + lockRequestForNewSegment, + posseToUse.getTaskLock().getVersion(), + taskAllocatorId + ); } } return LockResult.ok(posseToUse.getTaskLock(), newSegmentId); @@ -514,6 +537,7 @@ private void acquireTaskLock(SegmentAllocationHolder holder, boolean isTimeChunk } } + @Nullable private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task, boolean persist) { Preconditions.checkState(!(request instanceof LockRequestForNewSegment), "Can't handle LockRequestForNewSegment"); @@ -710,7 +734,7 @@ void allocateSegmentIds( } } - private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version) + private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version, String allocatorId) { return metadataStorageCoordinator.allocatePendingSegment( request.getDataSource(), @@ -719,7 +743,8 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques request.getInterval(), request.getPartialShardSpec(), version, - request.isSkipSegmentLineageCheck() + request.isSkipSegmentLineageCheck(), + allocatorId ); } @@ -1159,12 +1184,25 @@ public void add(Task task) try { log.info("Adding task[%s] to activeTasks", task.getId()); activeTasks.add(task.getId()); + trackAppendingTask(task); } finally { giant.unlock(); } } + @GuardedBy("giant") + private void trackAppendingTask(Task task) + { + if (task instanceof PendingSegmentAllocatingTask) { + final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId(); + if (taskAllocatorId != null) { + activeAllocatorIdToTaskIds.computeIfAbsent(taskAllocatorId, s -> new HashSet<>()) + .add(task.getId()); + } + } + } + /** * Release all locks for a task and remove task from set of active tasks. Does nothing if the task is not currently locked or not an active task. * @@ -1176,13 +1214,35 @@ public void remove(final Task task) try { try { log.info("Removing task[%s] from activeTasks", task.getId()); - if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) { - final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId()); - log.info( - "Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.", - upgradeSegmentsDeleted, - task.getId() - ); + try { + // Clean upgrade segments table for entries associated with replacing task + if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == TaskLockType.REPLACE)) { + final int upgradeSegmentsDeleted = metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId()); + log.info( + "Deleted [%d] entries from upgradeSegments table for task[%s] with REPLACE locks.", + upgradeSegmentsDeleted, task.getId() + ); + } + // Clean pending segments associated with the appending task + if (task instanceof PendingSegmentAllocatingTask) { + final String taskAllocatorId = ((PendingSegmentAllocatingTask) task).getTaskAllocatorId(); + if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) { + final Set idsInSameGroup = activeAllocatorIdToTaskIds.get(taskAllocatorId); + idsInSameGroup.remove(task.getId()); + if (idsInSameGroup.isEmpty()) { + final int pendingSegmentsDeleted + = metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId); + log.info( + "Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.", + pendingSegmentsDeleted, taskAllocatorId + ); + } + activeAllocatorIdToTaskIds.remove(taskAllocatorId); + } + } + } + catch (Exception e) { + log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments tables."); } unlockAll(task); } @@ -1771,7 +1831,9 @@ SegmentCreateRequest getSegmentRequest() action.getSequenceName(), action.getPreviousSegmentId(), acquiredLock == null ? lockRequest.getVersion() : acquiredLock.getVersion(), - action.getPartialShardSpec() + action.getPartialShardSpec(), + null, + ((PendingSegmentAllocatingTask) task).getTaskAllocatorId() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 810a991c2f22..dd57b560660c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -39,7 +39,6 @@ import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -126,15 +125,6 @@ public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String return Optional.absent(); } - public Set getActiveRealtimeSequencePrefixes(String activeSupervisorId) - { - if (supervisors.containsKey(activeSupervisorId)) { - return supervisors.get(activeSupervisorId).lhs.getActiveRealtimeSequencePrefixes(); - } else { - return Collections.emptySet(); - } - } - public Optional getSupervisorSpec(String id) { Pair supervisor = supervisors.get(id); @@ -340,7 +330,7 @@ public boolean registerNewVersionOfPendingSegmentOnSupervisor( return true; } catch (Exception e) { - log.error(e, "PendingSegment[%s] mapping update request to version[%s] on Supervisor[%s] failed", + log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed", basePendingSegment.asSegmentId(), newSegmentVersion.getVersion(), supervisorId); } return false; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 545090157a45..0ec9a67e8c1a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.common.actions.TaskLocks; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; @@ -61,7 +62,7 @@ public abstract class SeekableStreamIndexTask - extends AbstractTask implements ChatHandler + extends AbstractTask implements ChatHandler, PendingSegmentAllocatingTask { public static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTask.class); @@ -269,6 +270,12 @@ public boolean withinMinMaxRecordTime(final InputRow row) return !beforeMinimumMessageTime && !afterMaximumMessageTime; } + @Override + public String getTaskAllocatorId() + { + return getTaskResource().getAvailabilityGroup(); + } + protected abstract SeekableStreamIndexTaskRunner createTaskRunner(); /** diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 032142bba859..44f4ee1ad932 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -60,6 +60,7 @@ import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; +import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.RE; @@ -118,6 +119,7 @@ public abstract class IngestionTestBase extends InitializedNullHandlingTest private SegmentsMetadataManager segmentsMetadataManager; private TaskLockbox lockbox; private File baseDir; + private SupervisorManager supervisorManager; protected File reportsFile; @Before @@ -227,13 +229,14 @@ public TaskActionToolbox createTaskActionToolbox() taskStorage, storageCoordinator, new NoopServiceEmitter(), - null, + supervisorManager, objectMapper ); } - public TaskToolbox createTaskToolbox(TaskConfig config, Task task) + public TaskToolbox createTaskToolbox(TaskConfig config, Task task, SupervisorManager supervisorManager) { + this.supervisorManager = supervisorManager; return new TaskToolbox.Builder() .config(config) .taskExecutorNode(new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false)) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java index 69a8b6cc1030..230cfa4668c9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java @@ -36,10 +36,13 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** @@ -49,6 +52,7 @@ public class ActionsTestTask extends CommandQueueTask { private final TaskActionClient client; private final AtomicInteger sequenceId = new AtomicInteger(0); + private final Map announcedSegmentsToParentSegments = new HashMap<>(); public ActionsTestTask(String datasource, String groupId, TaskActionClientFactory factory) { @@ -78,16 +82,25 @@ public SegmentPublishResult commitReplaceSegments(DataSegment... segments) ); } + public Map getAnnouncedSegmentsToParentSegments() + { + return announcedSegmentsToParentSegments; + } + public SegmentPublishResult commitAppendSegments(DataSegment... segments) { - return runAction( + SegmentPublishResult publishResult = runAction( SegmentTransactionalAppendAction.forSegments(Sets.newHashSet(segments)) ); + for (DataSegment segment : publishResult.getSegments()) { + announcedSegmentsToParentSegments.remove(segment.getId()); + } + return publishResult; } public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Granularity preferredSegmentGranularity) { - return runAction( + SegmentIdWithShardSpec pendingSegment = runAction( new SegmentAllocateAction( getDataSource(), timestamp, @@ -101,28 +114,8 @@ public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Gr TaskLockType.APPEND ) ); - } - - public SegmentIdWithShardSpec allocateSegmentForTimestamp( - DateTime timestamp, - Granularity preferredSegmentGranularity, - String sequenceName - ) - { - return runAction( - new SegmentAllocateAction( - getDataSource(), - timestamp, - Granularities.SECOND, - preferredSegmentGranularity, - getId() + "__" + sequenceName, - null, - false, - NumberedPartialShardSpec.instance(), - LockGranularity.TIME_CHUNK, - TaskLockType.APPEND - ) - ); + announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId()); + return pendingSegment; } private T runAction(TaskAction action) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java index 08e2c18112f9..5945d1488c38 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/CommandQueueTask.java @@ -24,6 +24,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; +import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; @@ -40,7 +41,7 @@ /** * Test task that can be given a series of commands to execute in its {@link #runTask} method. */ -public class CommandQueueTask extends AbstractTask +public class CommandQueueTask extends AbstractTask implements PendingSegmentAllocatingTask { private static final Logger log = new Logger(CommandQueueTask.class); @@ -140,6 +141,12 @@ private V waitForCommandToFinish(Command command) } } + @Override + public String getTaskAllocatorId() + { + return getId(); + } + @Override public TaskStatus runTask(TaskToolbox taskToolbox) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index cc498c797bcb..415c63a0ee26 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -728,6 +728,7 @@ public void testMultipleGranularities() // Append segment for Oct-Dec final DataSegment segmentV02 = asSegment(pendingSegment02); appendTask2.commitAppendSegments(segmentV02); + appendTask2.finishRunAndGetStatus(); verifyIntervalHasUsedSegments(YEAR_23, segmentV02); verifyIntervalHasVisibleSegments(YEAR_23, segmentV02); @@ -747,12 +748,14 @@ public void testMultipleGranularities() // Append segment for Jan 1st final DataSegment segmentV01 = asSegment(pendingSegment01); appendTask.commitAppendSegments(segmentV01); + appendTask.finishRunAndGetStatus(); verifyIntervalHasUsedSegments(YEAR_23, segmentV01, segmentV02); verifyIntervalHasVisibleSegments(YEAR_23, segmentV01, segmentV02); // Replace segment for whole year final DataSegment segmentV10 = createSegment(YEAR_23, v1); replaceTask.commitReplaceSegments(segmentV10); + replaceTask.finishRunAndGetStatus(); final DataSegment segmentV11 = DataSegment.builder(segmentV01) .version(v1) @@ -767,6 +770,7 @@ public void testMultipleGranularities() // Append segment for quarter final DataSegment segmentV03 = asSegment(pendingSegment03); appendTask3.commitAppendSegments(segmentV03); + appendTask3.finishRunAndGetStatus(); final DataSegment segmentV13 = DataSegment.builder(segmentV03) .version(v1) @@ -1021,7 +1025,7 @@ private TaskToolboxFactory createToolboxFactory( @Override public TaskToolbox build(TaskConfig config, Task task) { - return createTaskToolbox(config, task); + return createTaskToolbox(config, task, null); } }; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java new file mode 100644 index 000000000000..50c318683e8f --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java @@ -0,0 +1,881 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.task.concurrent; + +import com.google.common.base.Optional; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter; +import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.TaskStorageDirTracker; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.TaskToolboxFactory; +import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; +import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.actions.TaskActionClientFactory; +import org.apache.druid.indexing.common.config.TaskConfig; +import org.apache.druid.indexing.common.config.TaskConfigBuilder; +import org.apache.druid.indexing.common.task.IngestionTestBase; +import org.apache.druid.indexing.common.task.NoopTask; +import org.apache.druid.indexing.common.task.NoopTaskContextEnricher; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TestAppenderatorsManager; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.indexing.overlord.Segments; +import org.apache.druid.indexing.overlord.TaskQueue; +import org.apache.druid.indexing.overlord.TaskRunner; +import org.apache.druid.indexing.overlord.TestTaskToolboxFactory; +import org.apache.druid.indexing.overlord.ThreadingTaskRunner; +import org.apache.druid.indexing.overlord.config.DefaultTaskConfig; +import org.apache.druid.indexing.overlord.config.TaskLockConfig; +import org.apache.druid.indexing.overlord.config.TaskQueueConfig; +import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; +import org.apache.druid.indexing.worker.config.WorkerConfig; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.server.DruidNode; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.tasklogs.NoopTaskLogs; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.easymock.Capture; +import org.easymock.CaptureType; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Contains tests to verify behaviour of concurrently running REPLACE and APPEND + * tasks on the same interval of a datasource. + *

+ * The tests verify the interleaving of the following actions: + *

    + *
  • LOCK: Acquisition of a lock on an interval by a replace task
  • + *
  • ALLOCATE: Allocation of a pending segment by an append task
  • + *
  • REPLACE: Commit of segments created by a replace task
  • + *
  • APPEND: Commit of segments created by an append task
  • + *
+ */ +public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase +{ + /** + * The version used by append jobs when no previous replace job has run on an interval. + */ + private static final String SEGMENT_V0 = DateTimes.EPOCH.toString(); + + private static final Interval JAN_23 = Intervals.of("2023-01/2023-02"); + private static final Interval FIRST_OF_JAN_23 = Intervals.of("2023-01-01/2023-01-02"); + + private static final String WIKI = "wiki"; + + private TaskQueue taskQueue; + private TaskActionClientFactory taskActionClientFactory; + private TaskActionClient dummyTaskActionClient; + private final List runningTasks = new ArrayList<>(); + + private ActionsTestTask appendTask; + private ActionsTestTask replaceTask; + + private final AtomicInteger groupId = new AtomicInteger(0); + private final SupervisorManager supervisorManager = EasyMock.mock(SupervisorManager.class); + private Capture supervisorId; + private Capture oldPendingSegment; + private Capture newPendingSegment; + private Map>> versionToIntervalToLoadSpecs; + private Map parentSegmentToLoadSpec; + + @Override + @Before + public void setUpIngestionTestBase() throws IOException + { + EasyMock.reset(supervisorManager); + EasyMock.expect(supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(WIKI)) + .andReturn(Optional.of(WIKI)).anyTimes(); + super.setUpIngestionTestBase(); + final TaskConfig taskConfig = new TaskConfigBuilder().build(); + taskActionClientFactory = createActionClientFactory(); + dummyTaskActionClient = taskActionClientFactory.create(NoopTask.create()); + + final WorkerConfig workerConfig = new WorkerConfig().setCapacity(10); + TaskRunner taskRunner = new ThreadingTaskRunner( + createToolboxFactory(taskConfig, taskActionClientFactory), + taskConfig, + workerConfig, + new NoopTaskLogs(), + getObjectMapper(), + new TestAppenderatorsManager(), + new MultipleFileTaskReportFileWriter(), + new DruidNode("middleManager", "host", false, 8091, null, true, false), + TaskStorageDirTracker.fromConfigs(workerConfig, taskConfig) + ); + taskQueue = new TaskQueue( + new TaskLockConfig(), + new TaskQueueConfig(null, new Period(0L), null, null, null), + new DefaultTaskConfig(), + getTaskStorage(), + taskRunner, + taskActionClientFactory, + getLockbox(), + new NoopServiceEmitter(), + getObjectMapper(), + new NoopTaskContextEnricher() + ); + runningTasks.clear(); + taskQueue.start(); + + groupId.set(0); + appendTask = createAndStartTask(); + supervisorId = Capture.newInstance(CaptureType.ALL); + oldPendingSegment = Capture.newInstance(CaptureType.ALL); + newPendingSegment = Capture.newInstance(CaptureType.ALL); + EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( + EasyMock.capture(supervisorId), + EasyMock.capture(oldPendingSegment), + EasyMock.capture(newPendingSegment) + )).andReturn(true).anyTimes(); + replaceTask = createAndStartTask(); + EasyMock.replay(supervisorManager); + versionToIntervalToLoadSpecs = new HashMap<>(); + parentSegmentToLoadSpec = new HashMap<>(); + } + + @After + public void tearDown() + { + verifyVersionIntervalLoadSpecUniqueness(); + for (ActionsTestTask task : runningTasks) { + task.finishRunAndGetStatus(); + } + } + + @Test + public void testLockReplaceAllocateAppend() + { + final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion(); + + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1); + commitReplaceSegments(segmentV10); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10); + + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(segmentV10.getVersion(), pendingSegment.getVersion()); + + final DataSegment segmentV11 = asSegment(pendingSegment); + commitAppendSegments(segmentV11); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11); + } + + @Test + public void testLockAllocateAppendDayReplaceDay() + { + final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion(); + + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01); + + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1); + commitReplaceSegments(segmentV10); + + // Verify that the segment appended to v0 gets upgraded to v1 + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .shardSpec(new NumberedShardSpec(1, 1)) + .version(v1).build(); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11); + } + + @Test + public void testLockAllocateReplaceDayAppendDay() + { + final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion(); + + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1); + commitReplaceSegments(segmentV10); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + // Verify that the segment appended to v0 gets upgraded to v1 + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .shardSpec(new NumberedShardSpec(1, 1)) + .version(v1).build(); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11); + } + + @Test + public void testAllocateLockReplaceDayAppendDay() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion(); + + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1); + commitReplaceSegments(segmentV10); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + // Verify that the segment appended to v0 gets upgraded to v1 + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .shardSpec(new NumberedShardSpec(1, 1)) + .version(v1).build(); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11); + } + + @Test + public void testAllocateLockAppendDayReplaceDay() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion(); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01); + + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1); + commitReplaceSegments(segmentV10); + replaceTask.finishRunAndGetStatus(); + + // Verify that the segment appended to v0 gets upgraded to v1 + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .shardSpec(new NumberedShardSpec(1, 1)) + .version(v1).build(); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11); + } + + @Test + public void testAllocateAppendDayLockReplaceDay() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01); + + final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion(); + + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1); + commitReplaceSegments(segmentV10); + + // Verify that the segment appended to v0 gets fully overshadowed + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10); + } + + @Test + public void testLockReplaceMonthAllocateAppendDay() + { + String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion(); + + final DataSegment segmentV10 = createSegment(JAN_23, v1); + commitReplaceSegments(segmentV10); + verifyIntervalHasUsedSegments(JAN_23, segmentV10); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10); + + // Verify that the allocated segment takes the version and interval of previous replace + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(v1, pendingSegment.getVersion()); + + final DataSegment segmentV11 = asSegment(pendingSegment); + commitAppendSegments(segmentV11); + + verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11); + } + + @Test + public void testLockAllocateAppendDayReplaceMonth() + { + final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion(); + + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01); + + final DataSegment segmentV10 = createSegment(JAN_23, v1); + commitReplaceSegments(segmentV10); + + // Verify that append segment gets upgraded to replace version + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .version(v1) + .interval(segmentV10.getInterval()) + .shardSpec(new NumberedShardSpec(1, 1)) + .build(); + verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11); + } + + @Test + public void testLockAllocateReplaceMonthAppendDay() + { + final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion(); + + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV10 = createSegment(JAN_23, v1); + commitReplaceSegments(segmentV10); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + // Verify that append segment gets upgraded to replace version + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .version(v1) + .interval(segmentV10.getInterval()) + .shardSpec(new NumberedShardSpec(1, 1)) + .build(); + verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11); + } + + @Test + public void testAllocateLockReplaceMonthAppendDay() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion(); + + final DataSegment segmentV10 = createSegment(JAN_23, v1); + commitReplaceSegments(segmentV10); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + // Verify that append segment gets upgraded to replace version + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .version(v1) + .interval(segmentV10.getInterval()) + .shardSpec(new NumberedShardSpec(1, 1)) + .build(); + verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11); + } + + @Test + public void testAllocateLockAppendDayReplaceMonth() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion(); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01); + + final DataSegment segmentV10 = createSegment(JAN_23, v1); + commitReplaceSegments(segmentV10); + + // Verify that append segment gets upgraded to replace version + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .version(v1) + .interval(segmentV10.getInterval()) + .shardSpec(new NumberedShardSpec(1, 1)) + .build(); + verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11); + } + + @Test + public void testAllocateAppendDayLockReplaceMonth() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01); + + final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion(); + + final DataSegment segmentV10 = createSegment(JAN_23, v1); + commitReplaceSegments(segmentV10); + + // Verify that the old segment gets completely replaced + verifyIntervalHasUsedSegments(JAN_23, segmentV01, segmentV10); + verifyIntervalHasVisibleSegments(JAN_23, segmentV10); + } + + @Test + public void testLockReplaceDayAllocateAppendMonth() + { + final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion(); + + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1); + replaceTask.commitReplaceSegments(segmentV10); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10); + + // Verify that an APPEND lock cannot be acquired on month + TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23); + Assert.assertNull(appendLock); + + // Verify that new segment gets allocated with DAY granularity even though preferred was MONTH + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertEquals(v1, pendingSegment.getVersion()); + Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval()); + + final DataSegment segmentV11 = asSegment(pendingSegment); + commitAppendSegments(segmentV11); + + verifyIntervalHasUsedSegments(JAN_23, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11); + } + + @Test + public void testLockAllocateAppendMonthReplaceDay() + { + final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion(); + + // Verify that an APPEND lock cannot be acquired on month + TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23); + Assert.assertNull(appendLock); + + // Verify that the segment is allocated for DAY granularity + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV01); + + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1); + commitReplaceSegments(segmentV10); + + // Verify that append segment gets upgraded to replace version + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .version(v1) + .interval(segmentV10.getInterval()) + .shardSpec(new NumberedShardSpec(1, 1)) + .build(); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11); + } + + @Test + public void testLockAllocateReplaceDayAppendMonth() + { + final String v1 = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23).getVersion(); + + // Verify that an APPEND lock cannot be acquired on month + TaskLock appendLock = appendTask.acquireAppendLockOn(JAN_23); + Assert.assertNull(appendLock); + + // Verify that the segment is allocated for DAY granularity instead of MONTH + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertEquals(FIRST_OF_JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, v1); + commitReplaceSegments(segmentV10); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV10); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + + final DataSegment segmentV11 = DataSegment.builder(segmentV01) + .interval(FIRST_OF_JAN_23) + .version(v1) + .shardSpec(new NumberedShardSpec(1, 1)) + .build(); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segmentV01, segmentV10, segmentV11); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV10, segmentV11); + } + + @Test + public void testAllocateLockReplaceDayAppendMonth() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertEquals(JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + // Verify that replace lock cannot be acquired on MONTH + TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23); + Assert.assertNull(replaceLock); + + // Verify that segment cannot be committed since there is no lock + final DataSegment segmentV10 = createSegment(FIRST_OF_JAN_23, SEGMENT_V0); + final ISE exception = Assert.assertThrows(ISE.class, () -> commitReplaceSegments(segmentV10)); + final Throwable throwable = Throwables.getRootCause(exception); + Assert.assertEquals( + StringUtils.format( + "Segments[[%s]] are not covered by locks[[]] for task[%s]", + segmentV10, replaceTask.getId() + ), + throwable.getMessage() + ); + + final DataSegment segmentV01 = asSegment(pendingSegment); + commitAppendSegments(segmentV01); + verifyIntervalHasUsedSegments(JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(JAN_23, segmentV01); + } + + @Test + public void testAllocateAppendMonthLockReplaceDay() + { + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(JAN_23.getStart(), Granularities.MONTH); + Assert.assertEquals(JAN_23, pendingSegment.getInterval()); + Assert.assertEquals(SEGMENT_V0, pendingSegment.getVersion()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + appendTask.commitAppendSegments(segmentV01); + + verifyIntervalHasUsedSegments(JAN_23, segmentV01); + verifyIntervalHasVisibleSegments(JAN_23, segmentV01); + + // Verify that replace lock cannot be acquired on DAY as MONTH is already locked + final TaskLock replaceLock = replaceTask.acquireReplaceLockOn(FIRST_OF_JAN_23); + Assert.assertNull(replaceLock); + } + + @Test + public void testLockAllocateDayReplaceMonthAllocateAppend() + { + final SegmentIdWithShardSpec pendingSegmentV0 + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + + final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion(); + + final DataSegment segmentV10 = createSegment(JAN_23, v1); + commitReplaceSegments(segmentV10); + verifyIntervalHasUsedSegments(JAN_23, segmentV10); + + final SegmentIdWithShardSpec pendingSegmentV1 + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + Assert.assertEquals(segmentV10.getVersion(), pendingSegmentV1.getVersion()); + + final DataSegment segmentV00 = asSegment(pendingSegmentV0); + final DataSegment segmentV11 = asSegment(pendingSegmentV1); + Set appendSegments = commitAppendSegments(segmentV00, segmentV11) + .getSegments(); + + Assert.assertEquals(3, appendSegments.size()); + // Segment V11 is committed + Assert.assertTrue(appendSegments.remove(segmentV11)); + // Segment V00 is also committed + Assert.assertTrue(appendSegments.remove(segmentV00)); + // Segment V00 is upgraded to v1 with MONTH granularlity at the time of commit as V12 + final DataSegment segmentV12 = Iterables.getOnlyElement(appendSegments); + Assert.assertEquals(v1, segmentV12.getVersion()); + Assert.assertEquals(JAN_23, segmentV12.getInterval()); + Assert.assertEquals(segmentV00.getLoadSpec(), segmentV12.getLoadSpec()); + + verifyIntervalHasUsedSegments(JAN_23, segmentV00, segmentV10, segmentV11, segmentV12); + verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12); + } + + + @Nullable + private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) + { + for (DataSegment segment : segments) { + if (version.equals(segment.getVersion()) + && Objects.equals(segment.getLoadSpec(), loadSpec)) { + return segment; + } + } + + return null; + } + + private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment) + { + final SegmentId id = pendingSegment.asSegmentId(); + return new DataSegment( + id, + Collections.singletonMap(id.toString(), id.toString()), + Collections.emptyList(), + Collections.emptyList(), + pendingSegment.getShardSpec(), + null, + 0, + 0 + ); + } + + private void verifyIntervalHasUsedSegments(Interval interval, DataSegment... expectedSegments) + { + verifySegments(interval, Segments.INCLUDING_OVERSHADOWED, expectedSegments); + } + + private void verifyIntervalHasVisibleSegments(Interval interval, DataSegment... expectedSegments) + { + verifySegments(interval, Segments.ONLY_VISIBLE, expectedSegments); + } + + private void verifySegments(Interval interval, Segments visibility, DataSegment... expectedSegments) + { + try { + Collection allUsedSegments = dummyTaskActionClient.submit( + new RetrieveUsedSegmentsAction( + WIKI, + null, + ImmutableList.of(interval), + visibility + ) + ); + Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); + } + catch (IOException e) { + throw new ISE(e, "Error while fetching used segments in interval[%s]", interval); + } + } + + private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments) + { + try { + final TaskActionClient taskActionClient = taskActionClientFactory.create(task); + Collection allUsedSegments = taskActionClient.submit( + new RetrieveUsedSegmentsAction( + WIKI, + Collections.singletonList(interval) + ) + ); + Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); + } + catch (IOException e) { + throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval); + } + } + + private TaskToolboxFactory createToolboxFactory( + TaskConfig taskConfig, + TaskActionClientFactory taskActionClientFactory + ) + { + TestTaskToolboxFactory.Builder builder = new TestTaskToolboxFactory.Builder() + .setConfig(taskConfig) + .setIndexIO(new IndexIO(getObjectMapper(), ColumnConfig.DEFAULT)) + .setTaskActionClientFactory(taskActionClientFactory); + return new TestTaskToolboxFactory(builder) + { + @Override + public TaskToolbox build(TaskConfig config, Task task) + { + return createTaskToolbox(config, task, supervisorManager); + } + }; + } + + private DataSegment createSegment(Interval interval, String version) + { + SegmentId id = SegmentId.of(WIKI, interval, version, null); + return DataSegment.builder() + .dataSource(WIKI) + .interval(interval) + .version(version) + .loadSpec(Collections.singletonMap(id.toString(), id.toString())) + .size(100) + .build(); + } + + private ActionsTestTask createAndStartTask() + { + ActionsTestTask task = new ActionsTestTask(WIKI, "test_" + groupId.incrementAndGet(), taskActionClientFactory); + taskQueue.add(task); + runningTasks.add(task); + return task; + } + + private void commitReplaceSegments(DataSegment... dataSegments) + { + replaceTask.commitReplaceSegments(dataSegments); + for (int i = 0; i < supervisorId.getValues().size(); i++) { + announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), newPendingSegment.getValues().get(i)); + } + supervisorId.reset(); + oldPendingSegment.reset(); + newPendingSegment.reset(); + replaceTask.finishRunAndGetStatus(); + } + + private SegmentPublishResult commitAppendSegments(DataSegment... dataSegments) + { + SegmentPublishResult result = appendTask.commitAppendSegments(dataSegments); + result.getSegments().forEach(this::unannounceUpgradedPendingSegment); + for (DataSegment segment : dataSegments) { + parentSegmentToLoadSpec.put(segment.getId(), Iterables.getOnlyElement(segment.getLoadSpec().values())); + } + appendTask.finishRunAndGetStatus(); + return result; + } + + private void announceUpgradedPendingSegment( + SegmentIdWithShardSpec oldPendingSegment, + SegmentIdWithShardSpec newPendingSegment + ) + { + appendTask.getAnnouncedSegmentsToParentSegments() + .put(newPendingSegment.asSegmentId(), oldPendingSegment.asSegmentId()); + } + + private void unannounceUpgradedPendingSegment( + DataSegment segment + ) + { + appendTask.getAnnouncedSegmentsToParentSegments() + .remove(segment.getId()); + } + + private void verifyVersionIntervalLoadSpecUniqueness() + { + for (DataSegment usedSegment : getAllUsedSegments()) { + final String version = usedSegment.getVersion(); + final Interval interval = usedSegment.getInterval(); + final Object loadSpec = Iterables.getOnlyElement(usedSegment.getLoadSpec().values()); + Map> intervalToLoadSpecs + = versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>()); + Set loadSpecs + = intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>()); + Assert.assertFalse(loadSpecs.contains(loadSpec)); + loadSpecs.add(loadSpec); + } + + for (Map.Entry entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) { + final String version = entry.getKey().getVersion(); + final Interval interval = entry.getKey().getInterval(); + final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue()); + Map> intervalToLoadSpecs + = versionToIntervalToLoadSpecs.computeIfAbsent(version, v -> new HashMap<>()); + Set loadSpecs + = intervalToLoadSpecs.computeIfAbsent(interval, i -> new HashSet<>()); + Assert.assertFalse(loadSpecs.contains(loadSpec)); + loadSpecs.add(loadSpec); + } + } + + private Collection getAllUsedSegments() + { + try { + return dummyTaskActionClient.submit( + new RetrieveUsedSegmentsAction( + WIKI, + null, + ImmutableList.of(Intervals.ETERNITY), + Segments.INCLUDING_OVERSHADOWED + ) + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 307879fed8bd..999d4d0abb2e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1896,14 +1896,17 @@ public void testUnlockSupersededLocks() } @Test - public void testUpgradeSegmentsCleanupOnUnlock() + public void testCleanupOnUnlock() { - final Task replaceTask = NoopTask.create(); - final Task appendTask = NoopTask.create(); + final Task replaceTask = NoopTask.forDatasource("replace"); + final Task appendTask = NoopTask.forDatasource("append"); final IndexerSQLMetadataStorageCoordinator coordinator = EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class); // Only the replaceTask should attempt a delete on the upgradeSegments table EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once(); + // Any task may attempt pending segment clean up + EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(replaceTask.getId())).andReturn(0).once(); + EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(appendTask.getId())).andReturn(0).once(); EasyMock.replay(coordinator); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 9ad3be6e3619..f57494a1e03b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -30,6 +30,7 @@ import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; @@ -175,7 +176,8 @@ public SegmentPublishResult commitReplaceSegments( @Override public SegmentPublishResult commitAppendSegments( Set appendSegments, - Map appendSegmentToReplaceLock + Map appendSegmentToReplaceLock, + String taskGroup ) { return SegmentPublishResult.ok(commitSegments(appendSegments)); @@ -186,7 +188,8 @@ public SegmentPublishResult commitAppendSegmentsAndMetadata( Set appendSegments, Map appendSegmentToReplaceLock, DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + DataSourceMetadata endMetadata, + String taskGroup ) { return SegmentPublishResult.ok(commitSegments(appendSegments)); @@ -228,7 +231,8 @@ public SegmentIdWithShardSpec allocatePendingSegment( Interval interval, PartialShardSpec partialShardSpec, String maxVersion, - boolean skipSegmentLineageCheck + boolean skipSegmentLineageCheck, + String taskAllocatorId ) { return new SegmentIdWithShardSpec( @@ -241,8 +245,7 @@ public SegmentIdWithShardSpec allocatePendingSegment( @Override public Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments, - Set activeBaseSequenceNames + Set replaceSegments ) { return Collections.emptyMap(); @@ -285,6 +288,18 @@ public int deleteUpgradeSegmentsForTask(final String taskId) throw new UnsupportedOperationException(); } + @Override + public int deletePendingSegmentsForTaskGroup(final String taskGroup) + { + throw new UnsupportedOperationException(); + } + + @Override + public List getPendingSegments(String datasource, Interval interval) + { + throw new UnsupportedOperationException(); + } + public Set getPublished() { return ImmutableSet.copyOf(published); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index a889605c210f..2390e7b55003 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord; import org.apache.druid.java.util.common.Pair; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; @@ -238,7 +239,7 @@ Map allocatePendingSegments( * identifier may have a version lower than this one, but will not have one higher. * @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence. * Should be set to false if replica tasks would index events in same order - * + * @param taskAllocatorId The task allocator id with which the pending segment is associated * @return the pending segment identifier, or null if it was impossible to allocate a new segment */ SegmentIdWithShardSpec allocatePendingSegment( @@ -248,7 +249,8 @@ SegmentIdWithShardSpec allocatePendingSegment( Interval interval, PartialShardSpec partialShardSpec, String maxVersion, - boolean skipSegmentLineageCheck + boolean skipSegmentLineageCheck, + String taskAllocatorId ); /** @@ -322,10 +324,12 @@ SegmentPublishResult commitSegmentsAndMetadata( * must be committed in a single transaction. * @param appendSegmentToReplaceLock Map from append segment to the currently * active REPLACE lock (if any) covering it + * @param taskAllocatorId allocator id of the task committing the segments to be appended */ SegmentPublishResult commitAppendSegments( Set appendSegments, - Map appendSegmentToReplaceLock + Map appendSegmentToReplaceLock, + String taskAllocatorId ); /** @@ -340,7 +344,8 @@ SegmentPublishResult commitAppendSegmentsAndMetadata( Set appendSegments, Map appendSegmentToReplaceLock, DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + DataSourceMetadata endMetadata, + String taskGroup ); /** @@ -373,13 +378,10 @@ SegmentPublishResult commitReplaceSegments( * * * @param replaceSegments Segments being committed by a REPLACE task - * @param activeRealtimeSequencePrefixes Set of sequence prefixes of active and pending completion task groups - * of the supervisor (if any) for this datasource * @return Map from originally allocated pending segment to its new upgraded ID. */ Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments, - Set activeRealtimeSequencePrefixes + Set replaceSegments ); /** @@ -476,4 +478,19 @@ SegmentPublishResult commitMetadataOnly( * @return number of deleted entries from the metadata store */ int deleteUpgradeSegmentsForTask(String taskId); + + /** + * Delete pending segment for a give task group after all the tasks belonging to it have completed. + * @param taskAllocatorId task id / task group / replica group for an appending task + * @return number of pending segments deleted from the metadata store + */ + int deletePendingSegmentsForTaskGroup(String taskAllocatorId); + + /** + * Fetches all the pending segments of the datasource that overlap with a given interval. + * @param datasource datasource to be queried + * @param interval interval with which segments overlap + * @return List of pending segment records + */ + List getPendingSegments(String datasource, Interval interval); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java index b43e46d8e7a5..49b31e5e6ff9 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentCreateRequest.java @@ -38,18 +38,24 @@ public class SegmentCreateRequest private final String sequenceName; private final String previousSegmentId; private final PartialShardSpec partialShardSpec; + private final String upgradedFromSegmentId; + private final String taskAllocatorId; public SegmentCreateRequest( String sequenceName, String previousSegmentId, String version, - PartialShardSpec partialShardSpec + PartialShardSpec partialShardSpec, + String upgradedFromSegmentId, + String taskAllocatorId ) { this.sequenceName = sequenceName; this.previousSegmentId = previousSegmentId == null ? "" : previousSegmentId; this.version = version; this.partialShardSpec = partialShardSpec; + this.upgradedFromSegmentId = upgradedFromSegmentId; + this.taskAllocatorId = taskAllocatorId; } public String getSequenceName() @@ -75,4 +81,14 @@ public PartialShardSpec getPartialShardSpec() { return partialShardSpec; } + + public String getUpgradedFromSegmentId() + { + return upgradedFromSegmentId; + } + + public String getTaskAllocatorId() + { + return taskAllocatorId; + } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index d364299d21d2..e7567ba27284 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -57,7 +57,6 @@ import org.apache.druid.timeline.SegmentTimeline; import org.apache.druid.timeline.TimelineObjectHolder; import org.apache.druid.timeline.partition.NoneShardSpec; -import org.apache.druid.timeline.partition.NumberedPartialShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.PartialShardSpec; import org.apache.druid.timeline.partition.PartitionChunk; @@ -94,7 +93,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -281,99 +279,74 @@ public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interv } /** - * Fetches all the pending segments, whose interval overlaps with the given - * search interval and has a sequence_name that begins with one of the prefixes in sequenceNamePrefixFilter - * from the metadata store. Returns a Map from the pending segment ID to the sequence name. + * Fetches all the pending segments, whose interval overlaps with the given search interval, from the metadata store. */ @VisibleForTesting - Map getPendingSegmentsForIntervalWithHandle( + List getPendingSegmentsForIntervalWithHandle( final Handle handle, final String dataSource, - final Interval interval, - final Set sequenceNamePrefixFilter - ) throws IOException + final Interval interval + ) { - if (sequenceNamePrefixFilter.isEmpty()) { - return Collections.emptyMap(); - } - - final List sequenceNamePrefixes = new ArrayList<>(sequenceNamePrefixFilter); - final List sequenceNamePrefixConditions = new ArrayList<>(); - for (int i = 0; i < sequenceNamePrefixes.size(); i++) { - sequenceNamePrefixConditions.add(StringUtils.format("(sequence_name LIKE :prefix%d)", i)); - } + final boolean compareIntervalEndpointsAsStrings = Intervals.canCompareEndpointsAsStrings(interval); - String sql = "SELECT sequence_name, payload" + String sql = "SELECT payload, sequence_name, sequence_prev_id, task_allocator_id, upgraded_from_segment_id" + " FROM " + dbTables.getPendingSegmentsTable() - + " WHERE dataSource = :dataSource" - + " AND start < :end" - + StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString()) - + " AND ( " + String.join(" OR ", sequenceNamePrefixConditions) + " )"; + + " WHERE dataSource = :dataSource"; + if (compareIntervalEndpointsAsStrings) { + sql = sql + + " AND start < :end" + + StringUtils.format(" AND %1$send%1$s > :start", connector.getQuoteString()); + } Query> query = handle.createQuery(sql) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()); - - for (int i = 0; i < sequenceNamePrefixes.size(); i++) { - query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i) + "%"); + .bind("dataSource", dataSource); + if (compareIntervalEndpointsAsStrings) { + query = query.bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); } - final ResultIterator dbSegments = - query.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)) - .iterator(); - final Map pendingSegmentToSequenceName = new HashMap<>(); - while (dbSegments.hasNext()) { - PendingSegmentsRecord record = dbSegments.next(); - final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class); - - if (interval.overlaps(identifier.getInterval())) { - pendingSegmentToSequenceName.put(identifier, record.sequenceName); + final ResultIterator pendingSegmentIterator = + query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r, jsonMapper)) + .iterator(); + final ImmutableList.Builder pendingSegments = ImmutableList.builder(); + while (pendingSegmentIterator.hasNext()) { + final PendingSegmentRecord pendingSegment = pendingSegmentIterator.next(); + if (compareIntervalEndpointsAsStrings || pendingSegment.getId().getInterval().overlaps(interval)) { + pendingSegments.add(pendingSegment); } } - - dbSegments.close(); - - return pendingSegmentToSequenceName; + pendingSegmentIterator.close(); + return pendingSegments.build(); } - private Map getPendingSegmentsForIntervalWithHandle( + List getPendingSegmentsForTaskAllocatorIdWithHandle( final Handle handle, final String dataSource, - final Interval interval - ) throws IOException + final String taskAllocatorId + ) { - final ResultIterator dbSegments = - handle.createQuery( - StringUtils.format( - // This query might fail if the year has a different number of digits - // See https://github.com/apache/druid/pull/11582 for a similar issue - // Using long for these timestamps instead of varchar would give correct time comparisons - "SELECT sequence_name, payload FROM %1$s" - + " WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", - dbTables.getPendingSegmentsTable(), connector.getQuoteString() - ) - ) - .bind("dataSource", dataSource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r)) - .iterator(); + String sql = "SELECT payload, sequence_name, sequence_prev_id, task_allocator_id, upgraded_from_segment_id" + + " FROM " + dbTables.getPendingSegmentsTable() + + " WHERE dataSource = :dataSource AND task_allocator_id = :task_allocator_id"; - final Map pendingSegmentToSequenceName = new HashMap<>(); - while (dbSegments.hasNext()) { - PendingSegmentsRecord record = dbSegments.next(); - final SegmentIdWithShardSpec identifier = jsonMapper.readValue(record.payload, SegmentIdWithShardSpec.class); + Query> query = handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("task_allocator_id", taskAllocatorId); - if (interval.overlaps(identifier.getInterval())) { - pendingSegmentToSequenceName.put(identifier, record.sequenceName); - } + final ResultIterator pendingSegmentRecords = + query.map((index, r, ctx) -> PendingSegmentRecord.fromResultSet(r, jsonMapper)) + .iterator(); + + final List pendingSegments = new ArrayList<>(); + while (pendingSegmentRecords.hasNext()) { + pendingSegments.add(pendingSegmentRecords.next()); } - dbSegments.close(); + pendingSegmentRecords.close(); - return pendingSegmentToSequenceName; + return pendingSegments; } private SegmentTimeline getTimelineForIntervalsWithHandle( @@ -503,9 +476,11 @@ public SegmentPublishResult commitReplaceSegments( segmentsToInsert.addAll( createNewIdsOfAppendSegmentsAfterReplace(handle, replaceSegments, locksHeldByReplaceTask) ); - return SegmentPublishResult.ok( + SegmentPublishResult result = SegmentPublishResult.ok( insertSegments(handle, segmentsToInsert) ); + upgradePendingSegmentsOverlappingWith(segmentsToInsert); + return result; }, 3, getSqlMetadataMaxRetry() @@ -519,14 +494,16 @@ public SegmentPublishResult commitReplaceSegments( @Override public SegmentPublishResult commitAppendSegments( final Set appendSegments, - final Map appendSegmentToReplaceLock + final Map appendSegmentToReplaceLock, + final String taskAllocatorId ) { return commitAppendSegmentsAndMetadataInTransaction( appendSegments, appendSegmentToReplaceLock, null, - null + null, + taskAllocatorId ); } @@ -535,14 +512,16 @@ public SegmentPublishResult commitAppendSegmentsAndMetadata( Set appendSegments, Map appendSegmentToReplaceLock, DataSourceMetadata startMetadata, - DataSourceMetadata endMetadata + DataSourceMetadata endMetadata, + String taskAllocatorId ) { return commitAppendSegmentsAndMetadataInTransaction( appendSegments, appendSegmentToReplaceLock, startMetadata, - endMetadata + endMetadata, + taskAllocatorId ); } @@ -645,7 +624,8 @@ public SegmentIdWithShardSpec allocatePendingSegment( final Interval interval, final PartialShardSpec partialShardSpec, final String maxVersion, - final boolean skipSegmentLineageCheck + final boolean skipSegmentLineageCheck, + String taskAllocatorId ) { Preconditions.checkNotNull(dataSource, "dataSource"); @@ -677,7 +657,8 @@ public SegmentIdWithShardSpec allocatePendingSegment( allocateInterval, partialShardSpec, maxVersion, - existingChunks + existingChunks, + taskAllocatorId ); } else { return allocatePendingSegmentWithSegmentLineageCheck( @@ -688,7 +669,8 @@ public SegmentIdWithShardSpec allocatePendingSegment( allocateInterval, partialShardSpec, maxVersion, - existingChunks + existingChunks, + taskAllocatorId ); } } @@ -697,8 +679,7 @@ public SegmentIdWithShardSpec allocatePendingSegment( @Override public Map upgradePendingSegmentsOverlappingWith( - Set replaceSegments, - Set activeRealtimeSequencePrefixes + Set replaceSegments ) { if (replaceSegments.isEmpty()) { @@ -717,7 +698,7 @@ public Map upgradePendingSegment final String datasource = replaceSegments.iterator().next().getDataSource(); return connector.retryWithHandle( - handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId, activeRealtimeSequencePrefixes) + handle -> upgradePendingSegments(handle, datasource, replaceIntervalToMaxId) ); } @@ -736,11 +717,10 @@ public Map upgradePendingSegment private Map upgradePendingSegments( Handle handle, String datasource, - Map replaceIntervalToMaxId, - Set activeRealtimeSequencePrefixes - ) throws IOException + Map replaceIntervalToMaxId + ) throws JsonProcessingException { - final Map newPendingSegmentVersions = new HashMap<>(); + final List upgradedPendingSegments = new ArrayList<>(); final Map pendingSegmentToNewId = new HashMap<>(); for (Map.Entry entry : replaceIntervalToMaxId.entrySet()) { @@ -751,15 +731,13 @@ private Map upgradePendingSegmen final int numCorePartitions = maxSegmentId.getShardSpec().getNumCorePartitions(); int currentPartitionNumber = maxSegmentId.getShardSpec().getPartitionNum(); - final Map overlappingPendingSegments - = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval, activeRealtimeSequencePrefixes); + final List overlappingPendingSegments + = getPendingSegmentsForIntervalWithHandle(handle, datasource, replaceInterval); - for (Map.Entry overlappingPendingSegment - : overlappingPendingSegments.entrySet()) { - final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getKey(); - final String pendingSegmentSequence = overlappingPendingSegment.getValue(); + for (PendingSegmentRecord overlappingPendingSegment : overlappingPendingSegments) { + final SegmentIdWithShardSpec pendingSegmentId = overlappingPendingSegment.getId(); - if (shouldUpgradePendingSegment(pendingSegmentId, pendingSegmentSequence, replaceInterval, replaceVersion)) { + if (shouldUpgradePendingSegment(overlappingPendingSegment, replaceInterval, replaceVersion)) { // Ensure unique sequence_name_prev_id_sha1 by setting // sequence_prev_id -> pendingSegmentId // sequence_name -> prefix + replaceVersion @@ -769,14 +747,14 @@ private Map upgradePendingSegmen replaceVersion, new NumberedShardSpec(++currentPartitionNumber, numCorePartitions) ); - newPendingSegmentVersions.put( - new SegmentCreateRequest( + upgradedPendingSegments.add( + new PendingSegmentRecord( + newId, UPGRADED_PENDING_SEGMENT_PREFIX + replaceVersion, pendingSegmentId.toString(), - replaceVersion, - NumberedPartialShardSpec.instance() - ), - newId + pendingSegmentId.toString(), + overlappingPendingSegment.getTaskAllocatorId() + ) ); pendingSegmentToNewId.put(pendingSegmentId, newId); } @@ -787,33 +765,34 @@ private Map upgradePendingSegmen // includes hash of both sequence_name and prev_segment_id int numInsertedPendingSegments = insertPendingSegmentsIntoMetastore( handle, - newPendingSegmentVersions, + upgradedPendingSegments, datasource, false ); log.info( "Inserted total [%d] new versions for [%d] pending segments.", - numInsertedPendingSegments, newPendingSegmentVersions.size() + numInsertedPendingSegments, upgradedPendingSegments.size() ); return pendingSegmentToNewId; } private boolean shouldUpgradePendingSegment( - SegmentIdWithShardSpec pendingSegmentId, - String pendingSegmentSequenceName, + PendingSegmentRecord pendingSegment, Interval replaceInterval, String replaceVersion ) { - if (pendingSegmentId.getVersion().compareTo(replaceVersion) >= 0) { + if (pendingSegment.getTaskAllocatorId() == null) { + return false; + } else if (pendingSegment.getId().getVersion().compareTo(replaceVersion) >= 0) { return false; - } else if (!replaceInterval.contains(pendingSegmentId.getInterval())) { + } else if (!replaceInterval.contains(pendingSegment.getId().getInterval())) { return false; } else { // Do not upgrade already upgraded pending segment - return pendingSegmentSequenceName == null - || !pendingSegmentSequenceName.startsWith(UPGRADED_PENDING_SEGMENT_PREFIX); + return pendingSegment.getSequenceName() == null + || !pendingSegment.getSequenceName().startsWith(UPGRADED_PENDING_SEGMENT_PREFIX); } } @@ -826,7 +805,8 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( final Interval interval, final PartialShardSpec partialShardSpec, final String maxVersion, - final List> existingChunks + final List> existingChunks, + final String taskAllocatorId ) throws IOException { final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; @@ -896,7 +876,8 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( interval, previousSegmentIdNotNull, sequenceName, - sequenceNamePrevIdSha1 + sequenceNamePrevIdSha1, + taskAllocatorId ); return newIdentifier; } @@ -947,7 +928,7 @@ private Map allocatePendingSegment } // For each of the remaining requests, create a new segment - final Map createdSegments = createNewSegments( + final Map createdSegments = createNewSegments( handle, dataSource, interval, @@ -965,12 +946,14 @@ private Map allocatePendingSegment // have difficulty with large unique keys (see https://github.com/apache/druid/issues/2319) insertPendingSegmentsIntoMetastore( handle, - createdSegments, + ImmutableList.copyOf(createdSegments.values()), dataSource, skipSegmentLineageCheck ); - allocatedSegmentIds.putAll(createdSegments); + for (Map.Entry entry : createdSegments.entrySet()) { + allocatedSegmentIds.put(entry.getKey(), entry.getValue().getId()); + } return allocatedSegmentIds; } @@ -1009,7 +992,8 @@ private SegmentIdWithShardSpec allocatePendingSegment( final Interval interval, final PartialShardSpec partialShardSpec, final String maxVersion, - final List> existingChunks + final List> existingChunks, + final String taskAllocatorId ) throws IOException { final String sql = StringUtils.format( @@ -1073,7 +1057,9 @@ private SegmentIdWithShardSpec allocatePendingSegment( ); // always insert empty previous sequence id - insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); + insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1, + taskAllocatorId + ); log.info( "Created new pending segment[%s] for datasource[%s], sequence[%s], interval[%s].", @@ -1281,7 +1267,8 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( Set appendSegments, Map appendSegmentToReplaceLock, @Nullable DataSourceMetadata startMetadata, - @Nullable DataSourceMetadata endMetadata + @Nullable DataSourceMetadata endMetadata, + String taskAllocatorId ) { verifySegmentsToCommit(appendSegments); @@ -1291,16 +1278,38 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } final String dataSource = appendSegments.iterator().next().getDataSource(); - final Set segmentIdsForNewVersions = connector.retryTransaction( + final List segmentIdsForNewVersions = connector.retryTransaction( (handle, transactionStatus) - -> createNewIdsForAppendSegments(handle, dataSource, appendSegments), + -> getPendingSegmentsForTaskAllocatorIdWithHandle(handle, dataSource, taskAllocatorId), 0, SQLMetadataConnector.DEFAULT_MAX_TRIES ); + // Create entries for all required versions of the append segments final Set allSegmentsToInsert = new HashSet<>(appendSegments); - allSegmentsToInsert.addAll(segmentIdsForNewVersions); + + final Map segmentIdMap = new HashMap<>(); + appendSegments.forEach(segment -> segmentIdMap.put(segment.getId().toString(), segment)); + segmentIdsForNewVersions.forEach( + pendingSegment -> { + if (segmentIdMap.containsKey(pendingSegment.getUpgradedFromSegmentId())) { + final DataSegment oldSegment = segmentIdMap.get(pendingSegment.getUpgradedFromSegmentId()); + allSegmentsToInsert.add( + new DataSegment( + pendingSegment.getId().asSegmentId(), + oldSegment.getLoadSpec(), + oldSegment.getDimensions(), + oldSegment.getMetrics(), + pendingSegment.getId().getShardSpec(), + oldSegment.getLastCompactionState(), + oldSegment.getBinaryVersion(), + oldSegment.getSize() + ) + ); + } + } + ); final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false); try { @@ -1341,31 +1350,27 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } } - private int insertPendingSegmentsIntoMetastore( + @VisibleForTesting + int insertPendingSegmentsIntoMetastore( Handle handle, - Map createdSegments, + List pendingSegments, String dataSource, boolean skipSegmentLineageCheck ) throws JsonProcessingException { final PreparedBatch insertBatch = handle.prepareBatch( StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " - + "sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " - + ":sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable(), - connector.getQuoteString() - )); - - // Deduplicate the segment ids by inverting the map - Map segmentIdToRequest = new HashMap<>(); - createdSegments.forEach((request, segmentId) -> segmentIdToRequest.put(segmentId, request)); + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " + + "sequence_name_prev_id_sha1, payload, task_allocator_id, upgraded_from_segment_id) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " + + ":sequence_name_prev_id_sha1, :payload, :task_allocator_id, :upgraded_from_segment_id)", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + )); final String now = DateTimes.nowUtc().toString(); - for (Map.Entry entry : segmentIdToRequest.entrySet()) { - final SegmentCreateRequest request = entry.getValue(); - final SegmentIdWithShardSpec segmentId = entry.getKey(); + for (PendingSegmentRecord pendingSegment : pendingSegments) { + final SegmentIdWithShardSpec segmentId = pendingSegment.getId(); final Interval interval = segmentId.getInterval(); insertBatch.add() @@ -1374,13 +1379,15 @@ private int insertPendingSegmentsIntoMetastore( .bind("created_date", now) .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()) - .bind("sequence_name", request.getSequenceName()) - .bind("sequence_prev_id", request.getPreviousSegmentId()) + .bind("sequence_name", pendingSegment.getSequenceName()) + .bind("sequence_prev_id", pendingSegment.getSequencePrevId()) .bind( "sequence_name_prev_id_sha1", - getSequenceNameAndPrevIdSha(request, segmentId, skipSegmentLineageCheck) + pendingSegment.computeSequenceNamePrevIdSha1(skipSegmentLineageCheck) ) - .bind("payload", jsonMapper.writeValueAsBytes(segmentId)); + .bind("payload", jsonMapper.writeValueAsBytes(segmentId)) + .bind("task_allocator_id", pendingSegment.getTaskAllocatorId()) + .bind("upgraded_from_segment_id", pendingSegment.getUpgradedFromSegmentId()); } int[] updated = insertBatch.execute(); return Arrays.stream(updated).sum(); @@ -1393,15 +1400,16 @@ private void insertPendingSegmentIntoMetastore( Interval interval, String previousSegmentId, String sequenceName, - String sequenceNamePrevIdSha1 + String sequenceNamePrevIdSha1, + String taskAllocatorId ) throws JsonProcessingException { handle.createStatement( StringUtils.format( "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " - + "sequence_name_prev_id_sha1, payload) " + + "sequence_name_prev_id_sha1, payload, task_allocator_id) " + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " - + ":sequence_name_prev_id_sha1, :payload)", + + ":sequence_name_prev_id_sha1, :payload, :task_allocator_id)", dbTables.getPendingSegmentsTable(), connector.getQuoteString() ) @@ -1415,188 +1423,18 @@ private void insertPendingSegmentIntoMetastore( .bind("sequence_prev_id", previousSegmentId) .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) + .bind("task_allocator_id", taskAllocatorId) .execute(); } - /** - * Creates new IDs for the given append segments if a REPLACE task started and - * finished after these append segments had already been allocated. The newly - * created IDs belong to the same interval and version as the segments committed - * by the REPLACE task. - */ - private Set createNewIdsForAppendSegments( - Handle handle, - String dataSource, - Set segmentsToAppend - ) throws IOException - { - if (segmentsToAppend.isEmpty()) { - return Collections.emptySet(); - } - - final Set appendIntervals = new HashSet<>(); - final TreeMap> appendVersionToSegments = new TreeMap<>(); - for (DataSegment segment : segmentsToAppend) { - appendIntervals.add(segment.getInterval()); - appendVersionToSegments.computeIfAbsent(segment.getVersion(), v -> new HashSet<>()) - .add(segment); - } - - // Fetch all used segments that overlap with any of the append intervals - final Collection overlappingSegments = retrieveUsedSegmentsForIntervals( - dataSource, - new ArrayList<>(appendIntervals), - Segments.INCLUDING_OVERSHADOWED - ); - - final Map> overlappingVersionToIntervals = new HashMap<>(); - final Map> overlappingIntervalToSegments = new HashMap<>(); - for (DataSegment segment : overlappingSegments) { - overlappingVersionToIntervals.computeIfAbsent(segment.getVersion(), v -> new HashSet<>()) - .add(segment.getInterval()); - overlappingIntervalToSegments.computeIfAbsent(segment.getInterval(), i -> new HashSet<>()) - .add(segment); - } - - final Set upgradedSegments = new HashSet<>(); - for (Map.Entry> entry : overlappingVersionToIntervals.entrySet()) { - final String upgradeVersion = entry.getKey(); - Map> segmentsToUpgrade = getSegmentsWithVersionLowerThan( - upgradeVersion, - entry.getValue(), - appendVersionToSegments - ); - for (Map.Entry> upgradeEntry : segmentsToUpgrade.entrySet()) { - final Interval upgradeInterval = upgradeEntry.getKey(); - final Set segmentsAlreadyOnVersion - = overlappingIntervalToSegments.getOrDefault(upgradeInterval, Collections.emptySet()) - .stream() - .filter(s -> s.getVersion().equals(upgradeVersion)) - .collect(Collectors.toSet()); - Set segmentsUpgradedToVersion = createNewIdsForAppendSegmentsWithVersion( - handle, - upgradeVersion, - upgradeInterval, - upgradeEntry.getValue(), - segmentsAlreadyOnVersion - ); - log.info("Upgraded [%d] segments to version[%s].", segmentsUpgradedToVersion.size(), upgradeVersion); - upgradedSegments.addAll(segmentsUpgradedToVersion); - } - } - - return upgradedSegments; - } - - /** - * Creates a Map from eligible interval to Set of segments that are fully - * contained in that interval and have a version strictly lower than {@code #cutoffVersion}. - */ - private Map> getSegmentsWithVersionLowerThan( - String cutoffVersion, - Set eligibleIntervals, - TreeMap> versionToSegments - ) - { - final Set eligibleSegments - = versionToSegments.headMap(cutoffVersion).values().stream() - .flatMap(Collection::stream) - .collect(Collectors.toSet()); - - final Map> eligibleIntervalToSegments = new HashMap<>(); - - for (DataSegment segment : eligibleSegments) { - final Interval segmentInterval = segment.getInterval(); - for (Interval eligibleInterval : eligibleIntervals) { - if (eligibleInterval.contains(segmentInterval)) { - eligibleIntervalToSegments.computeIfAbsent(eligibleInterval, itvl -> new HashSet<>()) - .add(segment); - break; - } else if (eligibleInterval.overlaps(segmentInterval)) { - // Committed interval overlaps only partially - throw new ISE( - "Committed interval[%s] conflicts with interval[%s] of append segment[%s].", - eligibleInterval, segmentInterval, segment.getId() - ); - } - } - } - - return eligibleIntervalToSegments; - } - - /** - * Computes new segment IDs that belong to the upgradeInterval and upgradeVersion. - * - * @param committedSegments Segments that already exist in the upgradeInterval - * at upgradeVersion. - */ - private Set createNewIdsForAppendSegmentsWithVersion( - Handle handle, - String upgradeVersion, - Interval upgradeInterval, - Set segmentsToUpgrade, - Set committedSegments - ) throws IOException - { - // Find the committed segments with the higest partition number - SegmentIdWithShardSpec committedMaxId = null; - for (DataSegment committedSegment : committedSegments) { - if (committedMaxId == null - || committedMaxId.getShardSpec().getPartitionNum() < committedSegment.getShardSpec().getPartitionNum()) { - committedMaxId = SegmentIdWithShardSpec.fromDataSegment(committedSegment); - } - } - - // Get pending segments for the new version to determine the next partition number to allocate - final String dataSource = segmentsToUpgrade.iterator().next().getDataSource(); - final Set pendingSegmentIds - = getPendingSegmentsForIntervalWithHandle(handle, dataSource, upgradeInterval).keySet(); - final Set allAllocatedIds = new HashSet<>(pendingSegmentIds); - - // Create new IDs for each append segment - final Set newSegmentIds = new HashSet<>(); - for (DataSegment segment : segmentsToUpgrade) { - SegmentCreateRequest request = new SegmentCreateRequest( - segment.getId() + "__" + upgradeVersion, - null, - upgradeVersion, - NumberedPartialShardSpec.instance() - ); - - // Create new segment ID based on committed segments, allocated pending segments - // and new IDs created so far in this method - final SegmentIdWithShardSpec newId = createNewSegment( - request, - dataSource, - upgradeInterval, - upgradeVersion, - committedMaxId, - allAllocatedIds - ); - - // Update the set so that subsequent segment IDs use a higher partition number - allAllocatedIds.add(newId); - newSegmentIds.add( - DataSegment.builder(segment) - .interval(newId.getInterval()) - .version(newId.getVersion()) - .shardSpec(newId.getShardSpec()) - .build() - ); - } - - return newSegmentIds; - } - - private Map createNewSegments( + private Map createNewSegments( Handle handle, String dataSource, Interval interval, boolean skipSegmentLineageCheck, List> existingChunks, List requests - ) throws IOException + ) { if (requests.isEmpty()) { return Collections.emptyMap(); @@ -1637,18 +1475,21 @@ private Map createNewSegments( // across all shard specs (published + pending). // A pending segment having a higher partitionId must also be considered // to avoid clashes when inserting the pending segment created here. - final Set pendingSegments = - new HashSet<>(getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet()); + final Set pendingSegments = new HashSet<>( + getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream() + .map(PendingSegmentRecord::getId) + .collect(Collectors.toSet()) + ); - final Map createdSegments = new HashMap<>(); - final Map uniqueRequestToSegment = new HashMap<>(); + final Map createdSegments = new HashMap<>(); + final Map uniqueRequestToSegment = new HashMap<>(); for (SegmentCreateRequest request : requests) { // Check if the required segment has already been created in this batch final UniqueAllocateRequest uniqueRequest = new UniqueAllocateRequest(interval, request, skipSegmentLineageCheck); - final SegmentIdWithShardSpec createdSegment; + final PendingSegmentRecord createdSegment; if (uniqueRequestToSegment.containsKey(uniqueRequest)) { createdSegment = uniqueRequestToSegment.get(uniqueRequest); } else { @@ -1663,9 +1504,9 @@ private Map createNewSegments( // Add to pendingSegments to consider for partitionId if (createdSegment != null) { - pendingSegments.add(createdSegment); + pendingSegments.add(createdSegment.getId()); uniqueRequestToSegment.put(uniqueRequest, createdSegment); - log.info("Created new segment[%s]", createdSegment); + log.info("Created new segment[%s]", createdSegment.getId()); } } @@ -1678,7 +1519,7 @@ private Map createNewSegments( return createdSegments; } - private SegmentIdWithShardSpec createNewSegment( + private PendingSegmentRecord createNewSegment( SegmentCreateRequest request, String dataSource, Interval interval, @@ -1731,12 +1572,19 @@ private SegmentIdWithShardSpec createNewSegment( : PartitionIds.ROOT_GEN_START_PARTITION_ID; String version = newSegmentVersion == null ? existingVersion : newSegmentVersion; - return new SegmentIdWithShardSpec( + SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec( dataSource, interval, version, partialShardSpec.complete(jsonMapper, newPartitionId, 0) ); + return new PendingSegmentRecord( + pendingSegmentId, + request.getSequenceName(), + request.getPreviousSegmentId(), + request.getUpgradedFromSegmentId(), + request.getTaskAllocatorId() + ); } else if (!overallMaxId.getInterval().equals(interval)) { log.warn( @@ -1761,7 +1609,7 @@ private SegmentIdWithShardSpec createNewSegment( // When the core partitions have been dropped, using pending segments may lead to an incorrect state // where the chunk is believed to have core partitions and queries results are incorrect. - return new SegmentIdWithShardSpec( + SegmentIdWithShardSpec pendingSegmentId = new SegmentIdWithShardSpec( dataSource, interval, Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"), @@ -1771,6 +1619,13 @@ private SegmentIdWithShardSpec createNewSegment( committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions() ) ); + return new PendingSegmentRecord( + pendingSegmentId, + request.getSequenceName(), + request.getPreviousSegmentId(), + request.getUpgradedFromSegmentId(), + request.getTaskAllocatorId() + ); } } @@ -1796,7 +1651,7 @@ private SegmentIdWithShardSpec createNewSegment( final PartialShardSpec partialShardSpec, final String existingVersion, final List> existingChunks - ) throws IOException + ) { // max partitionId of published data segments which share the same partition space. SegmentIdWithShardSpec committedMaxId = null; @@ -1830,7 +1685,9 @@ private SegmentIdWithShardSpec createNewSegment( // A pending segment having a higher partitionId must also be considered // to avoid clashes when inserting the pending segment created here. final Set pendings = new HashSet<>( - getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).keySet() + getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream() + .map(PendingSegmentRecord::getId) + .collect(Collectors.toSet()) ); if (committedMaxId != null) { pendings.add(committedMaxId); @@ -2688,6 +2545,30 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) ); } + @Override + public int deletePendingSegmentsForTaskGroup(final String pendingSegmentsGroup) + { + return connector.getDBI().inTransaction( + (handle, status) -> handle + .createStatement( + StringUtils.format( + "DELETE FROM %s WHERE task_allocator_id = :task_allocator_id", + dbTables.getPendingSegmentsTable() + ) + ) + .bind("task_allocator_id", pendingSegmentsGroup) + .execute() + ); + } + + @Override + public List getPendingSegments(String datasource, Interval interval) + { + return connector.retryWithHandle( + handle -> getPendingSegmentsForIntervalWithHandle(handle, datasource, interval) + ); + } + @Override public int deleteUpgradeSegmentsForTask(final String taskId) { diff --git a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java new file mode 100644 index 000000000000..44c62bf47ad1 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.joda.time.Interval; + +import javax.annotation.Nullable; +import java.sql.ResultSet; + +/** + * Representation of a record in the pending segments table.
+ * Mapping of column in table to field: + * + *
    + *
  • id -> id (Unique identifier for pending segment)
  • + *
  • sequence_name -> sequenceName (sequence name used for segment allocation)
  • + *
  • sequence_prev_id -> sequencePrevId (previous segment id used for segment allocation)
  • + *
  • upgraded_from_segment_id -> upgradedFromSegmentId (Id of the root segment from which this was upgraded)
  • + *
  • task_allocator_id -> taskAllocatorId (Associates a task / task group / replica group with the pending segment)
  • + *
+ */ +public class PendingSegmentRecord +{ + private final SegmentIdWithShardSpec id; + private final String sequenceName; + private final String sequencePrevId; + private final String upgradedFromSegmentId; + private final String taskAllocatorId; + + public PendingSegmentRecord( + SegmentIdWithShardSpec id, + String sequenceName, + String sequencePrevId, + @Nullable String upgradedFromSegmentId, + @Nullable String taskAllocatorId + ) + { + this.id = id; + this.sequenceName = sequenceName; + this.sequencePrevId = sequencePrevId; + this.upgradedFromSegmentId = upgradedFromSegmentId; + this.taskAllocatorId = taskAllocatorId; + } + + public SegmentIdWithShardSpec getId() + { + return id; + } + + public String getSequenceName() + { + return sequenceName; + } + + public String getSequencePrevId() + { + return sequencePrevId; + } + + /** + * The original pending segment using which this upgraded segment was created. + * Can be null for pending segments allocated before this column was added or for segments that have not been upgraded. + */ + @Nullable + public String getUpgradedFromSegmentId() + { + return upgradedFromSegmentId; + } + + /** + * task / taskGroup / replica group of task that allocated this segment. + * Can be null for pending segments allocated before this column was added. + */ + @Nullable + public String getTaskAllocatorId() + { + return taskAllocatorId; + } + + @SuppressWarnings("UnstableApiUsage") + public String computeSequenceNamePrevIdSha1(boolean skipSegmentLineageCheck) + { + final Hasher hasher = Hashing.sha1().newHasher() + .putBytes(StringUtils.toUtf8(getSequenceName())) + .putByte((byte) 0xff); + + if (skipSegmentLineageCheck) { + final Interval interval = getId().getInterval(); + hasher + .putLong(interval.getStartMillis()) + .putLong(interval.getEndMillis()); + } else { + hasher + .putBytes(StringUtils.toUtf8(getSequencePrevId())); + } + + hasher.putByte((byte) 0xff); + hasher.putBytes(StringUtils.toUtf8(getId().getVersion())); + + return BaseEncoding.base16().encode(hasher.hash().asBytes()); + } + + public static PendingSegmentRecord fromResultSet(ResultSet resultSet, ObjectMapper jsonMapper) + { + try { + final byte[] payload = resultSet.getBytes("payload"); + return new PendingSegmentRecord( + jsonMapper.readValue(payload, SegmentIdWithShardSpec.class), + resultSet.getString("sequence_name"), + resultSet.getString("sequence_prev_id"), + resultSet.getString("upgraded_from_segment_id"), + resultSet.getString("task_allocator_id") + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 6feaf9e07a38..99d69f5e14a3 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -288,6 +288,7 @@ tableName, getPayloadType(), getQuoteString(), getCollation() ) ) ); + alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName); } public void createDataSourceTable(final String tableName) @@ -460,6 +461,26 @@ private void alterEntryTableAddTypeAndGroupId(final String tableName) } } + private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String tableName) + { + List statements = new ArrayList<>(); + if (tableHasColumn(tableName, "upgraded_from_segment_id")) { + log.info("Table[%s] already has column[upgraded_from_segment_id].", tableName); + } else { + log.info("Adding column[upgraded_from_segment_id] to table[%s].", tableName); + statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN upgraded_from_segment_id VARCHAR(255)", tableName)); + } + if (tableHasColumn(tableName, "task_allocator_id")) { + log.info("Table[%s] already has column[task_allocator_id].", tableName); + } else { + log.info("Adding column[task_allocator_id] to table[%s].", tableName); + statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN task_allocator_id VARCHAR(255)", tableName)); + } + if (!statements.isEmpty()) { + alterTable(tableName, statements); + } + } + public void createLogTable(final String tableName, final String entryTypeName) { createTable( diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java index 33641a8417da..57e01d76a446 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentCreateRequestTest.java @@ -35,7 +35,9 @@ public void testNullPreviousSegmentId() "sequence", null, "version", - partialShardSpec + partialShardSpec, + null, + null ); Assert.assertEquals("sequence", request.getSequenceName()); Assert.assertEquals("", request.getPreviousSegmentId()); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 7b6fb4d11a2c..95ec1dda15e2 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -25,8 +25,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.hash.Hashing; -import com.google.common.io.BaseEncoding; import org.apache.druid.data.input.StringTuple; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; @@ -471,44 +469,6 @@ private Boolean insertUsedSegments(Set dataSegments) ); } - private Boolean insertPendingSegmentAndSequenceName(Pair pendingSegmentSequenceName) - { - final SegmentIdWithShardSpec pendingSegment = pendingSegmentSequenceName.lhs; - final String sequenceName = pendingSegmentSequenceName.rhs; - final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable(); - return derbyConnector.retryWithHandle( - handle -> { - handle.createStatement( - StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, " - + "sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, " - + ":sequence_name_prev_id_sha1, :payload)", - table, - derbyConnector.getQuoteString() - ) - ) - .bind("id", pendingSegment.toString()) - .bind("dataSource", pendingSegment.getDataSource()) - .bind("created_date", DateTimes.nowUtc().toString()) - .bind("start", pendingSegment.getInterval().getStart().toString()) - .bind("end", pendingSegment.getInterval().getEnd().toString()) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", pendingSegment.toString()) - .bind("sequence_name_prev_id_sha1", BaseEncoding.base16().encode( - Hashing.sha1() - .newHasher() - .putLong((long) pendingSegment.hashCode() * sequenceName.hashCode()) - .hash() - .asBytes() - )) - .bind("payload", mapper.writeValueAsBytes(pendingSegment)) - .execute(); - return true; - } - ); - } - private Map getSegmentsCommittedDuringReplaceTask(String taskId) { final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getUpgradeSegmentsTable(); @@ -620,7 +580,7 @@ public void testCommitAppendSegments() // Commit the segment and verify the results SegmentPublishResult commitResult - = coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock); + = coordinator.commitAppendSegments(appendSegments, segmentToReplaceLock, "append"); Assert.assertTrue(commitResult.isSuccess()); Assert.assertEquals(appendSegments, commitResult.getSegments()); @@ -649,6 +609,30 @@ public void testCommitReplaceSegments() final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1", Intervals.of("2023-01-01/2023-02-01"), "2023-02-01"); final Set segmentsAppendedWithReplaceLock = new HashSet<>(); final Map appendedSegmentToReplaceLockMap = new HashMap<>(); + final PendingSegmentRecord pendingSegmentInInterval = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "foo", + Intervals.of("2023-01-01/2023-01-02"), + "2023-01-02", + new NumberedShardSpec(100, 0) + ), + "", + "", + null, + "append" + ); + final PendingSegmentRecord pendingSegmentOutsideInterval = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "foo", + Intervals.of("2023-04-01/2023-04-02"), + "2023-01-02", + new NumberedShardSpec(100, 0) + ), + "", + "", + null, + "append" + ); for (int i = 1; i < 9; i++) { final DataSegment segment = new DataSegment( "foo", @@ -665,6 +649,14 @@ public void testCommitReplaceSegments() appendedSegmentToReplaceLockMap.put(segment, replaceLock); } insertUsedSegments(segmentsAppendedWithReplaceLock); + derbyConnector.retryWithHandle( + handle -> coordinator.insertPendingSegmentsIntoMetastore( + handle, + ImmutableList.of(pendingSegmentInInterval, pendingSegmentOutsideInterval), + "foo", + true + ) + ); insertIntoUpgradeSegmentsTable(appendedSegmentToReplaceLockMap); final Set replacingSegments = new HashSet<>(); @@ -709,6 +701,25 @@ public void testCommitReplaceSegments() } Assert.assertTrue(hasBeenCarriedForward); } + + List pendingSegmentsInInterval = + coordinator.getPendingSegments("foo", Intervals.of("2023-01-01/2023-02-01")); + Assert.assertEquals(2, pendingSegmentsInInterval.size()); + final SegmentId rootPendingSegmentId = pendingSegmentInInterval.getId().asSegmentId(); + if (pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId() == null) { + Assert.assertEquals(rootPendingSegmentId, pendingSegmentsInInterval.get(0).getId().asSegmentId()); + Assert.assertEquals(rootPendingSegmentId.toString(), pendingSegmentsInInterval.get(1).getUpgradedFromSegmentId()); + } else { + Assert.assertEquals(rootPendingSegmentId, pendingSegmentsInInterval.get(1).getId().asSegmentId()); + Assert.assertEquals(rootPendingSegmentId.toString(), pendingSegmentsInInterval.get(0).getUpgradedFromSegmentId()); + } + + List pendingSegmentsOutsideInterval = + coordinator.getPendingSegments("foo", Intervals.of("2023-04-01/2023-05-01")); + Assert.assertEquals(1, pendingSegmentsOutsideInterval.size()); + Assert.assertEquals( + pendingSegmentOutsideInterval.getId().asSegmentId(), pendingSegmentsOutsideInterval.get(0).getId().asSegmentId() + ); } @Test @@ -2416,7 +2427,8 @@ public void testAllocatePendingSegment() interval, partialShardSpec, "version", - false + false, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString()); @@ -2428,7 +2440,8 @@ public void testAllocatePendingSegment() interval, partialShardSpec, identifier.getVersion(), - false + false, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString()); @@ -2440,7 +2453,8 @@ public void testAllocatePendingSegment() interval, partialShardSpec, identifier1.getVersion(), - false + false, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString()); @@ -2452,7 +2466,8 @@ public void testAllocatePendingSegment() interval, partialShardSpec, identifier1.getVersion(), - false + false, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier3.toString()); @@ -2465,7 +2480,8 @@ public void testAllocatePendingSegment() interval, partialShardSpec, "version", - false + false, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_3", identifier4.toString()); @@ -2501,7 +2517,8 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() interval, partialShardSpec, "version", - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version", identifier.toString()); // Since there are no used core partitions yet @@ -2515,7 +2532,8 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() interval, partialShardSpec, maxVersion, - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_1", identifier1.toString()); // Since there are no used core partitions yet @@ -2529,7 +2547,8 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() interval, partialShardSpec, maxVersion, - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_2", identifier2.toString()); // Since there are no used core partitions yet @@ -2559,7 +2578,8 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() interval, partialShardSpec, maxVersion, - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_1", identifier3.toString()); // Used segment set has 1 core partition @@ -2577,7 +2597,8 @@ public void testAllocatePendingSegmentAfterDroppingExistingSegment() interval, partialShardSpec, maxVersion, - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_version_new_2", identifier4.toString()); // Since all core partitions have been dropped @@ -2610,7 +2631,8 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() interval, partialShardSpec, "A", - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A", identifier.toString()); // Assume it publishes; create its corresponding segment @@ -2638,7 +2660,8 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() interval, partialShardSpec, maxVersion, - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier1.toString()); // Assume it publishes; create its corresponding segment @@ -2666,7 +2689,8 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() interval, partialShardSpec, maxVersion, - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_2", identifier2.toString()); // Assume it publishes; create its corresponding segment @@ -2720,7 +2744,8 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() interval, partialShardSpec, maxVersion, - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_B_1", identifier3.toString()); // no corresponding segment, pending aborted @@ -2754,7 +2779,8 @@ public void testAnotherAllocatePendingSegmentAfterRevertingCompaction() interval, partialShardSpec, maxVersion, - true + true, + null ); // maxid = B_1 -> new partno = 2 // versionofexistingchunk=A @@ -2791,7 +2817,7 @@ public void testAllocatePendingSegments() final Interval interval = Intervals.of("2017-01-01/2017-02-01"); final String sequenceName = "seq"; - final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec); + final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec, null, null); final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2802,7 +2828,7 @@ public void testAllocatePendingSegments() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString()); final SegmentCreateRequest request1 = - new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec); + new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec, null, null); final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2813,7 +2839,7 @@ public void testAllocatePendingSegments() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString()); final SegmentCreateRequest request2 = - new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec); + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2824,7 +2850,7 @@ public void testAllocatePendingSegments() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString()); final SegmentCreateRequest request3 = - new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec); + new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec, null, null); final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2836,7 +2862,7 @@ public void testAllocatePendingSegments() Assert.assertEquals(segmentId2, segmentId3); final SegmentCreateRequest request4 = - new SegmentCreateRequest("seq1", null, "v1", partialShardSpec); + new SegmentCreateRequest("seq1", null, "v1", partialShardSpec, null, null); final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments( dataSource, interval, @@ -2880,7 +2906,8 @@ public void testNoPendingSegmentsAndOneUsedSegment() interval, partialShardSpec, maxVersion, - true + true, + null ); Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString()); @@ -2905,7 +2932,8 @@ public void testDeletePendingSegment() throws InterruptedException interval, partialShardSpec, "version", - false + false, + null ); prevSegmentId = identifier.toString(); } @@ -2920,7 +2948,8 @@ public void testDeletePendingSegment() throws InterruptedException interval, partialShardSpec, "version", - false + false, + null ); prevSegmentId = identifier.toString(); } @@ -2947,7 +2976,8 @@ public void testAllocatePendingSegmentsWithOvershadowingSegments() throws IOExce interval, new NumberedOverwritePartialShardSpec(0, 1, (short) (i + 1)), "version", - false + false, + null ); Assert.assertEquals( StringUtils.format( @@ -3015,7 +3045,8 @@ public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() throws IO interval, partialShardSpec, "version", - true + true, + null ); HashBasedNumberedShardSpec shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec(); @@ -3046,7 +3077,8 @@ public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() throws IO interval, partialShardSpec, "version", - true + true, + null ); shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec(); @@ -3077,7 +3109,8 @@ public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() throws IO interval, new HashBasedNumberedPartialShardSpec(null, 2, 3, null), "version", - true + true, + null ); shardSpec = (HashBasedNumberedShardSpec) id.getShardSpec(); @@ -3124,7 +3157,8 @@ public void testAddNumberedShardSpecAfterMultiDimensionsShardSpecWithUnknownCore interval, NumberedPartialShardSpec.instance(), version, - false + false, + null ); Assert.assertNull(id); } @@ -3169,7 +3203,8 @@ public void testAddNumberedShardSpecAfterSingleDimensionsShardSpecWithUnknownCor interval, NumberedPartialShardSpec.instance(), version, - false + false, + null ); Assert.assertNull(id); } @@ -3329,64 +3364,6 @@ public void testMarkSegmentsAsUnusedWithinIntervalTwoYears() throws IOException ); } - @Test - public void testGetPendingSegmentsForIntervalWithSequencePrefixes() - { - Pair validIntervalValidSequence = Pair.of( - SegmentIdWithShardSpec.fromDataSegment(defaultSegment), - "validLOL" - ); - insertPendingSegmentAndSequenceName(validIntervalValidSequence); - - Pair validIntervalInvalidSequence = Pair.of( - SegmentIdWithShardSpec.fromDataSegment(defaultSegment2), - "invalidRandom" - ); - insertPendingSegmentAndSequenceName(validIntervalInvalidSequence); - - Pair invalidIntervalvalidSequence = Pair.of( - SegmentIdWithShardSpec.fromDataSegment(existingSegment1), - "validStuff" - ); - insertPendingSegmentAndSequenceName(invalidIntervalvalidSequence); - - Pair twentyFifteenWithAnotherValidSequence = Pair.of( - new SegmentIdWithShardSpec( - existingSegment1.getDataSource(), - Intervals.of("2015/2016"), - "1970-01-01", - new NumberedShardSpec(1, 0) - ), - "alsoValidAgain" - ); - insertPendingSegmentAndSequenceName(twentyFifteenWithAnotherValidSequence); - - Pair twentyFifteenWithInvalidSequence = Pair.of( - new SegmentIdWithShardSpec( - existingSegment1.getDataSource(), - Intervals.of("2015/2016"), - "1970-01-01", - new NumberedShardSpec(2, 0) - ), - "definitelyInvalid" - ); - insertPendingSegmentAndSequenceName(twentyFifteenWithInvalidSequence); - - - final Map expected = new HashMap<>(); - expected.put(validIntervalValidSequence.lhs, validIntervalValidSequence.rhs); - expected.put(twentyFifteenWithAnotherValidSequence.lhs, twentyFifteenWithAnotherValidSequence.rhs); - - final Map actual = - derbyConnector.retryWithHandle(handle -> coordinator.getPendingSegmentsForIntervalWithHandle( - handle, - defaultSegment.getDataSource(), - defaultSegment.getInterval(), - ImmutableSet.of("valid", "alsoValid") - )); - Assert.assertEquals(expected, actual); - } - @Test public void testRetrieveUsedSegmentsAndCreatedDates() { @@ -3471,7 +3448,8 @@ public void testTimelineVisibilityWith0CorePartitionTombstone() throws IOExcepti interval, NumberedPartialShardSpec.instance(), "version", - false + false, + null ); Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString()); @@ -3525,7 +3503,8 @@ public void testTimelineWith1CorePartitionTombstone() throws IOException interval, NumberedPartialShardSpec.instance(), "version", - false + false, + null ); Assert.assertEquals("wiki_2020-01-01T00:00:00.000Z_2021-01-01T00:00:00.000Z_version_1", identifier.toString());