diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 7e0eaf60d836..9da665adde46 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -298,12 +298,6 @@ public LagStats computeLagStats() throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor"); } - @Override - public Set getActiveRealtimeSequencePrefixes() - { - throw new UnsupportedOperationException(); - } - @Override public int getActiveTaskGroupsCount() { diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index 365fb1751eac..14bd59871253 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -207,8 +207,6 @@ public void testMaterializedViewSupervisorSpecCreated() Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveTaskGroupsCount()); - Assert.assertThrows(UnsupportedOperationException.class, () -> supervisor.getActiveRealtimeSequencePrefixes()); - Callable noop = new Callable() { @Override public Integer call() diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java index c04948402079..b4d18ea390e9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java @@ -32,7 +32,6 @@ import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.task.AbstractTask; -import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.msq.exec.MSQTasks; import org.apache.druid.msq.exec.Worker; @@ -46,7 +45,7 @@ import java.util.Set; @JsonTypeName(MSQWorkerTask.TYPE) -public class MSQWorkerTask extends AbstractTask implements PendingSegmentAllocatingTask +public class MSQWorkerTask extends AbstractTask { public static final String TYPE = "query_worker"; @@ -126,13 +125,6 @@ public Set getInputSourceResources() return ImmutableSet.of(); } - @Override - public String getTaskAllocatorId() - { - return getControllerTaskId(); - } - - @Override public boolean isReady(final TaskActionClient taskActionClient) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java index 482d67d81abe..6eff77184ea7 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java @@ -108,12 +108,4 @@ public void testGetInputSourceResources() MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty()); } - - @Test - public void testGetTaskAllocatorId() - { - MSQWorkerTask msqWorkerTask = new MSQWorkerTask(controllerTaskId, dataSource, workerNumber, context, retryCount); - Assert.assertEquals(controllerTaskId, msqWorkerTask.getTaskAllocatorId()); - } - } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index f2b080cff6ef..df188ac81533 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -34,13 +34,10 @@ import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.SegmentUtils; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; @@ -155,7 +152,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // failure to upgrade pending segments does not affect success of the commit if (publishResult.isSuccess() && toolbox.getSupervisorManager() != null) { try { - registerUpgradedPendingSegmentsOnSupervisor(task, toolbox); + registerUpgradedPendingSegmentsOnSupervisor(task, toolbox, publishResult.getUpgradedPendingSegments()); } catch (Exception e) { log.error(e, "Error while upgrading pending segments for task[%s]", task.getId()); @@ -168,7 +165,11 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) /** * Registers upgraded pending segments on the active supervisor, if any */ - private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionToolbox toolbox) + private void registerUpgradedPendingSegmentsOnSupervisor( + Task task, + TaskActionToolbox toolbox, + List upgradedPendingSegments + ) { final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); final Optional activeSupervisorIdWithAppendLock = @@ -178,42 +179,10 @@ private void registerUpgradedPendingSegmentsOnSupervisor(Task task, TaskActionTo return; } - final Set replaceLocksForTask = toolbox - .getTaskLockbox() - .getAllReplaceLocksForDatasource(task.getDataSource()) - .stream() - .filter(lock -> task.getId().equals(lock.getSupervisorTaskId())) - .collect(Collectors.toSet()); - - - Set pendingSegments = new HashSet<>(); - for (ReplaceTaskLock replaceLock : replaceLocksForTask) { - pendingSegments.addAll( - toolbox.getIndexerMetadataStorageCoordinator() - .getPendingSegments(task.getDataSource(), replaceLock.getInterval()) - ); - } - Map idToPendingSegment = new HashMap<>(); - pendingSegments.forEach(pendingSegment -> idToPendingSegment.put( - pendingSegment.getId().asSegmentId().toString(), - pendingSegment.getId() - )); - Map segmentToParent = new HashMap<>(); - pendingSegments.forEach(pendingSegment -> { - if (pendingSegment.getUpgradedFromSegmentId() != null - && !pendingSegment.getUpgradedFromSegmentId().equals(pendingSegment.getId().asSegmentId().toString())) { - segmentToParent.put( - pendingSegment.getId(), - idToPendingSegment.get(pendingSegment.getUpgradedFromSegmentId()) - ); - } - }); - - segmentToParent.forEach( - (newId, oldId) -> supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( + upgradedPendingSegments.forEach( + upgradedPendingSegment -> supervisorManager.registerUpgradedPendingSegmentOnSupervisor( activeSupervisorIdWithAppendLock.get(), - oldId, - newId + upgradedPendingSegment ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 0a1f00f90251..b8027fcc5ea1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.task.BatchAppenderators; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.IndexTaskUtils; -import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.SegmentAllocatorForBatch; import org.apache.druid.indexing.common.task.SegmentAllocators; import org.apache.druid.indexing.common.task.TaskResource; @@ -109,7 +108,7 @@ * generates and pushes segments, and reports them to the {@link SinglePhaseParallelIndexTaskRunner} instead of * publishing on its own. */ -public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler, PendingSegmentAllocatingTask +public class SinglePhaseSubTask extends AbstractBatchSubtask implements ChatHandler { public static final String TYPE = "single_phase_sub_task"; public static final String OLD_TYPE_NAME = "index_sub"; @@ -240,12 +239,6 @@ public String getSubtaskSpecId() return subtaskSpecId; } - @Override - public String getTaskAllocatorId() - { - return getGroupId(); - } - @Override public TaskStatus runTask(final TaskToolbox toolbox) throws Exception { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 7248fcab865e..5d71940d4704 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -1242,7 +1242,7 @@ public void remove(final Task task) idsInSameGroup.remove(task.getId()); if (idsInSameGroup.isEmpty()) { final int pendingSegmentsDeleted - = metadataStorageCoordinator.deletePendingSegmentsForTaskGroup(taskAllocatorId); + = metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId); log.info( "Deleted [%d] entries from pendingSegments table for pending segments group [%s] with APPEND locks.", pendingSegmentsDeleted, taskAllocatorId diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index dd57b560660c..288b2a141564 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -33,9 +33,9 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import javax.annotation.Nullable; @@ -308,16 +308,19 @@ public boolean checkPointDataSourceMetadata( * allows the supervisor to include the pending segment in queries fired against * that segment version. */ - public boolean registerNewVersionOfPendingSegmentOnSupervisor( + public boolean registerUpgradedPendingSegmentOnSupervisor( String supervisorId, - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion + PendingSegmentRecord upgradedPendingSegment ) { try { Preconditions.checkNotNull(supervisorId, "supervisorId cannot be null"); - Preconditions.checkNotNull(basePendingSegment, "rootPendingSegment cannot be null"); - Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion cannot be null"); + Preconditions.checkNotNull(upgradedPendingSegment, "upgraded pending segment cannot be null"); + Preconditions.checkNotNull(upgradedPendingSegment.getTaskAllocatorId(), "taskAllocatorId cannot be null"); + Preconditions.checkNotNull( + upgradedPendingSegment.getUpgradedFromSegmentId(), + "upgradedFromSegmentId cannot be null" + ); Pair supervisor = supervisors.get(supervisorId); Preconditions.checkNotNull(supervisor, "supervisor could not be found"); @@ -326,12 +329,12 @@ public boolean registerNewVersionOfPendingSegmentOnSupervisor( } SeekableStreamSupervisor seekableStreamSupervisor = (SeekableStreamSupervisor) supervisor.lhs; - seekableStreamSupervisor.registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion); + seekableStreamSupervisor.registerNewVersionOfPendingSegment(upgradedPendingSegment); return true; } catch (Exception e) { - log.error(e, "PendingSegmentRecord[%s] mapping update request to version[%s] on Supervisor[%s] failed", - basePendingSegment.asSegmentId(), newSegmentVersion.getVersion(), supervisorId); + log.error(e, "Failed to upgrade pending segment[%s] to new pending segment[%s] on Supervisor[%s].", + upgradedPendingSegment.getUpgradedFromSegmentId(), upgradedPendingSegment.getId().getVersion(), supervisorId); } return false; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java deleted file mode 100644 index 146b0afc4b9d..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/PendingSegmentVersions.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.seekablestream; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; - -/** - * Contains a new version of an existing base pending segment. Used by realtime - * tasks to serve queries against multiple versions of the same pending segment. - */ -public class PendingSegmentVersions -{ - private final SegmentIdWithShardSpec baseSegment; - private final SegmentIdWithShardSpec newVersion; - - @JsonCreator - public PendingSegmentVersions( - @JsonProperty("baseSegment") SegmentIdWithShardSpec baseSegment, - @JsonProperty("newVersion") SegmentIdWithShardSpec newVersion - ) - { - this.baseSegment = baseSegment; - this.newVersion = newVersion; - } - - @JsonProperty - public SegmentIdWithShardSpec getBaseSegment() - { - return baseSegment; - } - - @JsonProperty - public SegmentIdWithShardSpec getNewVersion() - { - return newVersion; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java index 5e5924249608..7fd282e44ce2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java @@ -21,8 +21,8 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.util.concurrent.ListenableFuture; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import java.util.List; @@ -158,15 +158,14 @@ ListenableFuture setEndOffsetsAsync( * Update the task state to redirect queries for later versions to the root pending segment. * The task also announces that it is serving the segments belonging to the subsequent versions. * The update is processed only if the task is serving the original pending segment. - * @param taskId - task id - * @param basePendingSegment - the pending segment that was originally allocated - * @param newVersionOfSegment - the ids belonging to the versions to which the root segment needs to be updated + * + * @param taskId - task id + * @param pendingSegmentRecord - the ids belonging to the versions to which the root segment needs to be updated * @return true if the update succeeds */ ListenableFuture registerNewVersionOfPendingSegmentAsync( String taskId, - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newVersionOfSegment + PendingSegmentRecord pendingSegmentRecord ); Class getPartitionType(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java index 40d475909e68..5de1cb50a971 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java @@ -43,6 +43,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler; import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.rpc.HttpResponseException; import org.apache.druid.rpc.IgnoreHttpResponseHandler; import org.apache.druid.rpc.RequestBuilder; @@ -57,7 +58,6 @@ import org.apache.druid.rpc.StandardRetryPolicy; import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy; import org.apache.druid.segment.incremental.ParseExceptionReport; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; @@ -197,13 +197,12 @@ public ListenableFuture> getEndOffsetsA @Override public ListenableFuture registerNewVersionOfPendingSegmentAsync( String taskId, - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newVersionOfSegment + PendingSegmentRecord pendingSegmentRecord ) { final RequestBuilder requestBuilder = new RequestBuilder(HttpMethod.POST, "/pendingSegmentVersion") - .jsonContent(jsonMapper, new PendingSegmentVersions(basePendingSegment, newVersionOfSegment)); + .jsonContent(jsonMapper, pendingSegmentRecord); return makeRequest(taskId, requestBuilder) .handler(IgnoreHttpResponseHandler.INSTANCE) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 96e1dd401459..94ce367fc847 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -78,6 +78,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMeters; @@ -1575,18 +1576,15 @@ public Response setEndOffsetsHTTP( @Path("/pendingSegmentVersion") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) - public Response registerNewVersionOfPendingSegment( - PendingSegmentVersions pendingSegmentVersions, + public Response registerUpgradedPendingSegment( + PendingSegmentRecord upgradedPendingSegment, // this field is only for internal purposes, shouldn't be usually set by users @Context final HttpServletRequest req ) { authorizationCheck(req, Action.WRITE); try { - ((StreamAppenderator) appenderator).registerNewVersionOfPendingSegment( - pendingSegmentVersions.getBaseSegment(), - pendingSegmentVersions.getNewVersion() - ); + ((StreamAppenderator) appenderator).registerUpgradedPendingSegment(upgradedPendingSegment); return Response.ok().build(); } catch (DruidException e) { @@ -1598,8 +1596,8 @@ public Response registerNewVersionOfPendingSegment( catch (Exception e) { log.error( e, - "Could not register new version[%s] of pending segment[%s]", - pendingSegmentVersions.getNewVersion(), pendingSegmentVersions.getBaseSegment() + "Could not register pending segment[%s] upgraded from[%s]", + upgradedPendingSegment.getId().asSegmentId(), upgradedPendingSegment.getUpgradedFromSegmentId() ); return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index f15a975694fd..58a433325a3f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -89,12 +89,12 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.ordering.StringComparators; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.joda.time.DateTime; import javax.annotation.Nonnull; @@ -178,7 +178,8 @@ public abstract class SeekableStreamSupervisor taskIds() return tasks.keySet(); } + @VisibleForTesting + public String getBaseSequenceName() + { + return baseSequenceName; + } } private class TaskData @@ -1096,42 +1102,23 @@ public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) addNotice(new ResetOffsetsNotice(resetDataSourceMetadata)); } - /** - * The base sequence name of a seekable stream task group is used as a prefix of the sequence names - * of pending segments published by it. - * This method can be used to identify the active pending segments for a datasource - * by checking if the sequence name begins with any of the active realtime sequence prefix returned by this method - * @return the set of base sequence names of both active and pending completion task gruops. - */ - @Override - public Set getActiveRealtimeSequencePrefixes() - { - final Set activeBaseSequences = new HashSet<>(); - for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { - activeBaseSequences.add(taskGroup.baseSequenceName); - } - for (List taskGroupList : pendingCompletionTaskGroups.values()) { - for (TaskGroup taskGroup : taskGroupList) { - activeBaseSequences.add(taskGroup.baseSequenceName); - } - } - return activeBaseSequences; - } - public void registerNewVersionOfPendingSegment( - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion + PendingSegmentRecord pendingSegmentRecord ) { for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { - for (String taskId : taskGroup.taskIds()) { - taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion); + if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) { + for (String taskId : taskGroup.taskIds()) { + taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord); + } } } for (List taskGroupList : pendingCompletionTaskGroups.values()) { for (TaskGroup taskGroup : taskGroupList) { - for (String taskId : taskGroup.taskIds()) { - taskClient.registerNewVersionOfPendingSegmentAsync(taskId, basePendingSegment, newSegmentVersion); + if (taskGroup.baseSequenceName.equals(pendingSegmentRecord.getTaskAllocatorId())) { + for (String taskId : taskGroup.taskIds()) { + taskClient.registerNewVersionOfPendingSegmentAsync(taskId, pendingSegmentRecord); + } } } } @@ -1548,7 +1535,7 @@ private List getCurrentParseErrors() } @VisibleForTesting - public void addTaskGroupToActivelyReadingTaskGroup( + public TaskGroup addTaskGroupToActivelyReadingTaskGroup( int taskGroupId, ImmutableMap partitionOffsets, Optional minMsgTime, @@ -1572,10 +1559,11 @@ public void addTaskGroupToActivelyReadingTaskGroup( taskGroupId ); } + return group; } @VisibleForTesting - public void addTaskGroupToPendingCompletionTaskGroup( + public TaskGroup addTaskGroupToPendingCompletionTaskGroup( int taskGroupId, ImmutableMap partitionOffsets, Optional minMsgTime, @@ -1595,6 +1583,7 @@ public void addTaskGroupToPendingCompletionTaskGroup( group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData()))); pendingCompletionTaskGroups.computeIfAbsent(taskGroupId, x -> new CopyOnWriteArrayList<>()) .add(group); + return group; } @VisibleForTesting @@ -3202,9 +3191,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException // If we received invalid endOffset values, we clear the known offset to refetch the last committed offset // from metadata. If any endOffset values are invalid, we treat the entire set as invalid as a safety measure. if (!endOffsetsAreInvalid) { - for (Entry entry : endOffsets.entrySet()) { - partitionOffsets.put(entry.getKey(), entry.getValue()); - } + partitionOffsets.putAll(endOffsets); } else { for (Entry entry : endOffsets.entrySet()) { partitionOffsets.put(entry.getKey(), getNotSetMarker()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java index b80641fe94bf..62b5e48e00b4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ActionsTestTask.java @@ -52,7 +52,7 @@ public class ActionsTestTask extends CommandQueueTask { private final TaskActionClient client; private final AtomicInteger sequenceId = new AtomicInteger(0); - private final Map announcedSegmentsToParentSegments = new HashMap<>(); + private final Map announcedSegmentsToParentSegments = new HashMap<>(); public ActionsTestTask(String datasource, String groupId, TaskActionClientFactory factory) { @@ -82,7 +82,7 @@ public SegmentPublishResult commitReplaceSegments(DataSegment... segments) ); } - public Map getAnnouncedSegmentsToParentSegments() + public Map getAnnouncedSegmentsToParentSegments() { return announcedSegmentsToParentSegments; } @@ -114,7 +114,7 @@ public SegmentIdWithShardSpec allocateSegmentForTimestamp(DateTime timestamp, Gr TaskLockType.APPEND ) ); - announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId()); + announcedSegmentsToParentSegments.put(pendingSegment.asSegmentId(), pendingSegment.asSegmentId().toString()); return pendingSegment; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java index 50c318683e8f..7da5a3d19fe8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndStreamingAppendTest.java @@ -55,6 +55,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -74,7 +75,6 @@ import org.junit.Before; import org.junit.Test; -import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -83,7 +83,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -122,10 +121,9 @@ public class ConcurrentReplaceAndStreamingAppendTest extends IngestionTestBase private final AtomicInteger groupId = new AtomicInteger(0); private final SupervisorManager supervisorManager = EasyMock.mock(SupervisorManager.class); private Capture supervisorId; - private Capture oldPendingSegment; - private Capture newPendingSegment; + private Capture pendingSegment; private Map>> versionToIntervalToLoadSpecs; - private Map parentSegmentToLoadSpec; + private Map parentSegmentToLoadSpec; @Override @Before @@ -169,12 +167,10 @@ public void setUpIngestionTestBase() throws IOException groupId.set(0); appendTask = createAndStartTask(); supervisorId = Capture.newInstance(CaptureType.ALL); - oldPendingSegment = Capture.newInstance(CaptureType.ALL); - newPendingSegment = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(supervisorManager.registerNewVersionOfPendingSegmentOnSupervisor( + pendingSegment = Capture.newInstance(CaptureType.ALL); + EasyMock.expect(supervisorManager.registerUpgradedPendingSegmentOnSupervisor( EasyMock.capture(supervisorId), - EasyMock.capture(oldPendingSegment), - EasyMock.capture(newPendingSegment) + EasyMock.capture(pendingSegment) )).andReturn(true).anyTimes(); replaceTask = createAndStartTask(); EasyMock.replay(supervisorManager); @@ -682,20 +678,6 @@ public void testLockAllocateDayReplaceMonthAllocateAppend() verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11, segmentV12); } - - @Nullable - private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) - { - for (DataSegment segment : segments) { - if (version.equals(segment.getVersion()) - && Objects.equals(segment.getLoadSpec(), loadSpec)) { - return segment; - } - } - - return null; - } - private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment) { final SegmentId id = pendingSegment.asSegmentId(); @@ -739,23 +721,6 @@ private void verifySegments(Interval interval, Segments visibility, DataSegment. } } - private void verifyInputSegments(Task task, Interval interval, DataSegment... expectedSegments) - { - try { - final TaskActionClient taskActionClient = taskActionClientFactory.create(task); - Collection allUsedSegments = taskActionClient.submit( - new RetrieveUsedSegmentsAction( - WIKI, - Collections.singletonList(interval) - ) - ); - Assert.assertEquals(Sets.newHashSet(expectedSegments), Sets.newHashSet(allUsedSegments)); - } - catch (IOException e) { - throw new ISE(e, "Error while fetching segments to replace in interval[%s]", interval); - } - } - private TaskToolboxFactory createToolboxFactory( TaskConfig taskConfig, TaskActionClientFactory taskActionClientFactory @@ -799,11 +764,10 @@ private void commitReplaceSegments(DataSegment... dataSegments) { replaceTask.commitReplaceSegments(dataSegments); for (int i = 0; i < supervisorId.getValues().size(); i++) { - announceUpgradedPendingSegment(oldPendingSegment.getValues().get(i), newPendingSegment.getValues().get(i)); + announceUpgradedPendingSegment(pendingSegment.getValues().get(i)); } supervisorId.reset(); - oldPendingSegment.reset(); - newPendingSegment.reset(); + pendingSegment.reset(); replaceTask.finishRunAndGetStatus(); } @@ -812,19 +776,16 @@ private SegmentPublishResult commitAppendSegments(DataSegment... dataSegments) SegmentPublishResult result = appendTask.commitAppendSegments(dataSegments); result.getSegments().forEach(this::unannounceUpgradedPendingSegment); for (DataSegment segment : dataSegments) { - parentSegmentToLoadSpec.put(segment.getId(), Iterables.getOnlyElement(segment.getLoadSpec().values())); + parentSegmentToLoadSpec.put(segment.getId().toString(), Iterables.getOnlyElement(segment.getLoadSpec().values())); } appendTask.finishRunAndGetStatus(); return result; } - private void announceUpgradedPendingSegment( - SegmentIdWithShardSpec oldPendingSegment, - SegmentIdWithShardSpec newPendingSegment - ) + private void announceUpgradedPendingSegment(PendingSegmentRecord pendingSegment) { appendTask.getAnnouncedSegmentsToParentSegments() - .put(newPendingSegment.asSegmentId(), oldPendingSegment.asSegmentId()); + .put(pendingSegment.getId().asSegmentId(), pendingSegment.getUpgradedFromSegmentId()); } private void unannounceUpgradedPendingSegment( @@ -849,7 +810,7 @@ private void verifyVersionIntervalLoadSpecUniqueness() loadSpecs.add(loadSpec); } - for (Map.Entry entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) { + for (Map.Entry entry : appendTask.getAnnouncedSegmentsToParentSegments().entrySet()) { final String version = entry.getKey().getVersion(); final Interval interval = entry.getKey().getInterval(); final Object loadSpec = parentSegmentToLoadSpec.get(entry.getValue()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index 7c16e2efc240..3af74235e823 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -1948,8 +1948,8 @@ public void testCleanupOnUnlock() // Only the replaceTask should attempt a delete on the upgradeSegments table EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once(); // Any task may attempt pending segment clean up - EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(replaceTask.getId())).andReturn(0).once(); - EasyMock.expect(coordinator.deletePendingSegmentsForTaskGroup(appendTask.getId())).andReturn(0).once(); + EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once(); + EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once(); EasyMock.replay(coordinator); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 5ffbd4b94608..4a9fccd4663b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -31,7 +31,11 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.metadata.PendingSegmentRecord; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -544,6 +548,53 @@ public void testGetActiveSupervisorIdForDatasourceWithAppendLock() verifyAll(); } + @Test + public void testRegisterUpgradedPendingSegmentOnSupervisor() + { + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap()); + + NoopSupervisorSpec noopSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS")); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec streamingSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + SeekableStreamSupervisor streamSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + streamSupervisor.registerNewVersionOfPendingSegment(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + EasyMock.expect(streamingSpec.getId()).andReturn("sss").anyTimes(); + EasyMock.expect(streamingSpec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(streamingSpec.getDataSources()).andReturn(ImmutableList.of("DS")).anyTimes(); + EasyMock.expect(streamingSpec.createSupervisor()).andReturn(streamSupervisor).anyTimes(); + EasyMock.expect(streamingSpec.createAutoscaler(streamSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(streamingSpec.getContext()).andReturn(null).anyTimes(); + EasyMock.replay(streamingSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + + replayAll(); + + final PendingSegmentRecord pendingSegment = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "DS", + Intervals.ETERNITY, + "version", + new NumberedShardSpec(0, 0) + ), + "sequenceName", + "prevSegmentId", + "upgradedFromSegmentId", + "taskAllocatorId" + ); + manager.start(); + + manager.createOrUpdateAndStartSupervisor(noopSpec); + Assert.assertFalse(manager.registerUpgradedPendingSegmentOnSupervisor("noop", pendingSegment)); + + manager.createOrUpdateAndStartSupervisor(streamingSpec); + Assert.assertTrue(manager.registerUpgradedPendingSegmentOnSupervisor("sss", pendingSegment)); + + verifyAll(); + } + private static class TestSupervisorSpec implements SupervisorSpec { private final String id; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 2602f8e5441f..489315cc2495 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -74,11 +74,13 @@ import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.java.util.metrics.StubServiceEmitter; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; @@ -86,6 +88,10 @@ import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.easymock.Capture; +import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.hamcrest.MatcherAssert; @@ -1548,10 +1554,19 @@ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws In } @Test - public void testGetActiveRealtimeSequencePrefixes() + public void testRegisterNewVersionOfPendingSegment() { EasyMock.expect(spec.isSuspended()).andReturn(false); + Capture captured0 = Capture.newInstance(CaptureType.FIRST); + Capture captured1 = Capture.newInstance(CaptureType.FIRST); + EasyMock.expect( + indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task0"), EasyMock.capture(captured0)) + ).andReturn(Futures.immediateFuture(true)); + EasyMock.expect( + indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task2"), EasyMock.capture(captured1)) + ).andReturn(Futures.immediateFuture(true)); + replayAll(); final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); @@ -1559,34 +1574,63 @@ public void testGetActiveRealtimeSequencePrefixes() // Spin off two active tasks with each task serving one partition. supervisor.getIoConfig().setTaskCount(3); supervisor.start(); - supervisor.addTaskGroupToActivelyReadingTaskGroup( + + final SeekableStreamSupervisor.TaskGroup taskGroup0 = supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition("0"), ImmutableMap.of("0", "5"), Optional.absent(), Optional.absent(), - ImmutableSet.of("task1"), + ImmutableSet.of("task0"), ImmutableSet.of() ); - - supervisor.addTaskGroupToActivelyReadingTaskGroup( + final SeekableStreamSupervisor.TaskGroup taskGroup1 = supervisor.addTaskGroupToActivelyReadingTaskGroup( supervisor.getTaskGroupIdForPartition("1"), ImmutableMap.of("1", "6"), Optional.absent(), Optional.absent(), - ImmutableSet.of("task2"), + ImmutableSet.of("task1"), ImmutableSet.of() ); - - supervisor.addTaskGroupToPendingCompletionTaskGroup( + final SeekableStreamSupervisor.TaskGroup taskGroup2 = supervisor.addTaskGroupToPendingCompletionTaskGroup( supervisor.getTaskGroupIdForPartition("2"), ImmutableMap.of("2", "100"), Optional.absent(), Optional.absent(), - ImmutableSet.of("task3"), + ImmutableSet.of("task2"), ImmutableSet.of() ); - Assert.assertEquals(3, supervisor.getActiveRealtimeSequencePrefixes().size()); + final PendingSegmentRecord pendingSegmentRecord0 = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "DS", + Intervals.of("2024/2025"), + "2024", + new NumberedShardSpec(1, 0) + ), + taskGroup0.getBaseSequenceName(), + "prevId0", + "someAppendedSegment0", + taskGroup0.getBaseSequenceName() + ); + final PendingSegmentRecord pendingSegmentRecord1 = new PendingSegmentRecord( + new SegmentIdWithShardSpec( + "DS", + Intervals.of("2024/2025"), + "2024", + new NumberedShardSpec(2, 0) + ), + taskGroup2.getBaseSequenceName(), + "prevId1", + "someAppendedSegment1", + taskGroup2.getBaseSequenceName() + ); + + supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord0); + supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord1); + + Assert.assertEquals(pendingSegmentRecord0, captured0.getValue()); + Assert.assertEquals(pendingSegmentRecord1, captured1.getValue()); + verifyAll(); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 1de41bb43a0f..6c4f556133ec 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -252,11 +252,11 @@ public SegmentIdWithShardSpec allocatePendingSegment( } @Override - public Map upgradePendingSegmentsOverlappingWith( + public List upgradePendingSegmentsOverlappingWith( Set replaceSegments ) { - return Collections.emptyMap(); + return Collections.emptyList(); } @Override @@ -297,7 +297,7 @@ public int deleteUpgradeSegmentsForTask(final String taskId) } @Override - public int deletePendingSegmentsForTaskGroup(final String taskGroup) + public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup) { throw new UnsupportedOperationException(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 23513c82ad7b..aea2674f6b85 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -389,9 +389,9 @@ SegmentPublishResult commitReplaceSegments( * * * @param replaceSegments Segments being committed by a REPLACE task - * @return Map from originally allocated pending segment to its new upgraded ID. + * @return List of inserted pending segment records */ - Map upgradePendingSegmentsOverlappingWith( + List upgradePendingSegmentsOverlappingWith( Set replaceSegments ); @@ -495,7 +495,7 @@ SegmentPublishResult commitMetadataOnly( * @param taskAllocatorId task id / task group / replica group for an appending task * @return number of pending segments deleted from the metadata store */ - int deletePendingSegmentsForTaskGroup(String taskAllocatorId); + int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId); /** * Fetches all the pending segments of the datasource that overlap with a given interval. diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index 620ff8831b09..e4bc1645f710 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -23,10 +23,13 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; +import java.util.List; import java.util.Objects; import java.util.Set; @@ -46,12 +49,19 @@ public class SegmentPublishResult private final boolean success; @Nullable private final String errorMsg; + @Nullable + private final List upgradedPendingSegments; public static SegmentPublishResult ok(Set segments) { return new SegmentPublishResult(segments, true, null); } + public static SegmentPublishResult ok(Set segments, List upgradedPendingSegments) + { + return new SegmentPublishResult(segments, true, null, upgradedPendingSegments); + } + public static SegmentPublishResult fail(String errorMsg) { return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg); @@ -63,13 +73,28 @@ private SegmentPublishResult( @JsonProperty("success") boolean success, @JsonProperty("errorMsg") @Nullable String errorMsg ) + { + this(segments, success, errorMsg, null); + } + + private SegmentPublishResult( + Set segments, + boolean success, + @Nullable String errorMsg, + List upgradedPendingSegments + ) { this.segments = Preconditions.checkNotNull(segments, "segments"); this.success = success; this.errorMsg = errorMsg; + this.upgradedPendingSegments = upgradedPendingSegments; if (!success) { Preconditions.checkArgument(segments.isEmpty(), "segments must be empty for unsuccessful publishes"); + Preconditions.checkArgument( + CollectionUtils.isNullOrEmpty(upgradedPendingSegments), + "upgraded pending segments must be null or empty for unsuccessful publishes" + ); } } @@ -92,6 +117,12 @@ public String getErrorMsg() return errorMsg; } + @Nullable + public List getUpgradedPendingSegments() + { + return upgradedPendingSegments; + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 20c102533862..e733ef6c233d 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -31,7 +31,6 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Set; @@ -186,12 +185,6 @@ public int getActiveTaskGroupsCount() { return -1; } - - @Override - public Set getActiveRealtimeSequencePrefixes() - { - return Collections.emptySet(); - } }; } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 9b9511cbf3da..b1fb439184d4 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -29,7 +29,6 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; -import java.util.Set; public interface Supervisor { @@ -103,9 +102,4 @@ default long computeLagForAutoScaler() } int getActiveTaskGroupsCount(); - - /** - * @return active sequence prefixes for reading and pending completion task groups of a seekable stream supervisor - */ - Set getActiveRealtimeSequencePrefixes(); } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index e36412e5dc1e..c5a36656c9a2 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -79,6 +79,7 @@ import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionStatus; +import org.skife.jdbi.v2.Update; import org.skife.jdbi.v2.exceptions.CallbackFailedException; import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.util.ByteArrayMapper; @@ -530,9 +531,9 @@ public SegmentPublishResult commitReplaceSegments( segmentSchemaMapping, upgradeSegmentMetadata, Collections.emptyMap() - ) + ), + upgradePendingSegmentsOverlappingWith(segmentsToInsert) ); - upgradePendingSegmentsOverlappingWith(segmentsToInsert); return result; }, 3, @@ -735,12 +736,12 @@ public SegmentIdWithShardSpec allocatePendingSegment( } @Override - public Map upgradePendingSegmentsOverlappingWith( + public List upgradePendingSegmentsOverlappingWith( Set replaceSegments ) { if (replaceSegments.isEmpty()) { - return Collections.emptyMap(); + return Collections.emptyList(); } // Any replace interval has exactly one version of segments @@ -769,16 +770,15 @@ public Map upgradePendingSegment * those versions. * * - * @return Map from original pending segment to the new upgraded ID. + * @return Inserted pending segment records */ - private Map upgradePendingSegments( + private List upgradePendingSegments( Handle handle, String datasource, Map replaceIntervalToMaxId ) throws JsonProcessingException { final List upgradedPendingSegments = new ArrayList<>(); - final Map pendingSegmentToNewId = new HashMap<>(); for (Map.Entry entry : replaceIntervalToMaxId.entrySet()) { final Interval replaceInterval = entry.getKey(); @@ -813,7 +813,6 @@ private Map upgradePendingSegmen overlappingPendingSegment.getTaskAllocatorId() ) ); - pendingSegmentToNewId.put(pendingSegmentId, newId); } } } @@ -831,7 +830,7 @@ private Map upgradePendingSegmen numInsertedPendingSegments, upgradedPendingSegments.size() ); - return pendingSegmentToNewId; + return upgradedPendingSegments; } private boolean shouldUpgradePendingSegment( @@ -1114,8 +1113,15 @@ private SegmentIdWithShardSpec allocatePendingSegment( ); // always insert empty previous sequence id - insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1, - taskAllocatorId + insertPendingSegmentIntoMetastore( + handle, + newIdentifier, + dataSource, + interval, + "", + sequenceName, + sequenceNamePrevIdSha1, + taskAllocatorId ); log.info( @@ -1320,6 +1326,39 @@ public int hashCode() } } + private static void bindColumnValuesToQueryWithInCondition( + final String columnName, + final List values, + final Update query + ) + { + if (values == null) { + return; + } + + for (int i = 0; i < values.size(); i++) { + query.bind(StringUtils.format("%s%d", columnName, i), values.get(i)); + } + } + + private int deletePendingSegmentsById(Handle handle, String datasource, List pendingSegmentIds) + { + if (pendingSegmentIds.isEmpty()) { + return 0; + } + + Update query = handle.createStatement( + StringUtils.format( + "DELETE FROM %s WHERE dataSource = :dataSource %s", + dbTables.getPendingSegmentsTable(), + SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", pendingSegmentIds) + ) + ).bind("dataSource", datasource); + bindColumnValuesToQueryWithInCondition("id", pendingSegmentIds, query); + + return query.execute(); + } + private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( Set appendSegments, Map appendSegmentToReplaceLock, @@ -1383,7 +1422,6 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( if (metadataUpdateResult.isFailed()) { transactionStatus.setRollbackOnly(); metadataNotUpdated.set(true); - if (metadataUpdateResult.canRetry()) { throw new RetryTransactionException(metadataUpdateResult.getErrorMsg()); } else { @@ -1393,6 +1431,20 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } insertIntoUpgradeSegmentsTable(handle, appendSegmentToReplaceLock); + + // Delete the pending segments to be committed in this transaction in batches of at most 100 + final List> pendingSegmentIdBatches = Lists.partition( + allSegmentsToInsert.stream() + .map(pendingSegment -> pendingSegment.getId().toString()) + .collect(Collectors.toList()), + 100 + ); + int numDeletedPendingSegments = 0; + for (List pendingSegmentIdBatch : pendingSegmentIdBatches) { + numDeletedPendingSegments += deletePendingSegmentsById(handle, dataSource, pendingSegmentIdBatch); + } + log.info("Deleted [%d] entries from pending segments table upon commit.", numDeletedPendingSegments); + return SegmentPublishResult.ok( insertSegments( handle, @@ -2761,7 +2813,7 @@ public DataSegment retrieveSegmentForId(final String id, boolean includeUnused) } @Override - public int deletePendingSegmentsForTaskGroup(final String pendingSegmentsGroup) + public int deletePendingSegmentsForTaskAllocatorId(final String pendingSegmentsGroup) { return connector.getDBI().inTransaction( (handle, status) -> handle diff --git a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java index 44c62bf47ad1..bfbaad18ef1a 100644 --- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java +++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java @@ -19,6 +19,8 @@ package org.apache.druid.metadata; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; @@ -50,12 +52,13 @@ public class PendingSegmentRecord private final String upgradedFromSegmentId; private final String taskAllocatorId; + @JsonCreator public PendingSegmentRecord( - SegmentIdWithShardSpec id, - String sequenceName, - String sequencePrevId, - @Nullable String upgradedFromSegmentId, - @Nullable String taskAllocatorId + @JsonProperty("id") SegmentIdWithShardSpec id, + @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("sequencePrevId") String sequencePrevId, + @JsonProperty("upgradedFromSegmentId") @Nullable String upgradedFromSegmentId, + @JsonProperty("taskAllocatorId") @Nullable String taskAllocatorId ) { this.id = id; @@ -65,16 +68,19 @@ public PendingSegmentRecord( this.taskAllocatorId = taskAllocatorId; } + @JsonProperty public SegmentIdWithShardSpec getId() { return id; } + @JsonProperty public String getSequenceName() { return sequenceName; } + @JsonProperty public String getSequencePrevId() { return sequencePrevId; @@ -85,6 +91,7 @@ public String getSequencePrevId() * Can be null for pending segments allocated before this column was added or for segments that have not been upgraded. */ @Nullable + @JsonProperty public String getUpgradedFromSegmentId() { return upgradedFromSegmentId; @@ -95,6 +102,7 @@ public String getUpgradedFromSegmentId() * Can be null for pending segments allocated before this column was added. */ @Nullable + @JsonProperty public String getTaskAllocatorId() { return taskAllocatorId; diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index fc990e107dd3..f14cc9950505 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -955,7 +955,7 @@ private static int computeNumChangedSegments(List segmentIds, int[] segm * * @implNote JDBI 3.x has better support for binding {@code IN} clauses directly. */ - private static String getParameterizedInConditionForColumn(final String columnName, final List values) + static String getParameterizedInConditionForColumn(final String columnName, final List values) { if (values == null) { return ""; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index d022580f7c19..65df4f56761a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -563,7 +563,8 @@ ListenableFuture dropInBackground(SegmentsAndCommitMe return new SegmentsAndCommitMetadata( segmentsAndCommitMetadata.getSegments(), metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata(), - segmentsAndCommitMetadata.getSegmentSchemaMapping() + segmentsAndCommitMetadata.getSegmentSchemaMapping(), + segmentsAndCommitMetadata.getUpgradedSegments() ); }, MoreExecutors.directExecutor() @@ -618,9 +619,10 @@ ListenableFuture publishInBackground( return executor.submit( () -> { try { - RetryUtils.retry( + return RetryUtils.retry( () -> { try { + final Set upgradedSegments = new HashSet<>(); final ImmutableSet ourSegments = ImmutableSet.copyOf(pushedAndTombstones); final SegmentPublishResult publishResult = publisher.publishSegments( segmentsToBeOverwritten, @@ -629,7 +631,6 @@ ListenableFuture publishInBackground( callerMetadata, segmentsAndCommitMetadata.getSegmentSchemaMapping() ); - if (publishResult.isSuccess()) { log.info( "Published [%s] segments with commit metadata [%s]", @@ -637,6 +638,13 @@ ListenableFuture publishInBackground( callerMetadata ); log.infoSegments(segmentsAndCommitMetadata.getSegments(), "Published segments"); + // This set must contain only those segments that were upgraded as a result of a concurrent replace. + upgradedSegments.addAll(publishResult.getSegments()); + segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove); + if (!upgradedSegments.isEmpty()) { + log.info("Published [%d] upgraded segments.", upgradedSegments.size()); + log.infoSegments(upgradedSegments, "Upgraded segments"); + } log.info("Published segment schemas: [%s]", segmentsAndCommitMetadata.getSegmentSchemaMapping()); } else { // Publishing didn't affirmatively succeed. However, segments with our identifiers may still be active @@ -691,6 +699,7 @@ ListenableFuture publishInBackground( throw new ISE("Failed to publish segments"); } } + return segmentsAndCommitMetadata.withUpgradedSegments(upgradedSegments); } catch (Exception e) { // Must not remove segments here, we aren't sure if our transaction succeeded or not. @@ -703,9 +712,10 @@ ListenableFuture publishInBackground( Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return segmentsAndCommitMetadata; }, - e -> (e.getMessage() != null && e.getMessage().contains("Failed to update the metadata Store. The new start metadata is ahead of last commited end state.")), + e -> (e != null && e.getMessage() != null + && e.getMessage().contains("Failed to update the metadata Store." + + " The new start metadata is ahead of last commited end state.")), RetryUtils.DEFAULT_MAX_TRIES ); } @@ -717,7 +727,6 @@ ListenableFuture publishInBackground( Throwables.propagateIfPossible(e); throw new RuntimeException(e); } - return segmentsAndCommitMetadata; } ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java index 4f0a53398e45..721876880578 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.appenderator; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -28,26 +29,59 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; public class SegmentsAndCommitMetadata { - private static final SegmentsAndCommitMetadata NIL = new SegmentsAndCommitMetadata(Collections.emptyList(), null, null); + private static final SegmentsAndCommitMetadata NIL + = new SegmentsAndCommitMetadata(Collections.emptyList(), null, null, null); private final Object commitMetadata; private final ImmutableList segments; private final SegmentSchemaMapping segmentSchemaMapping; + private final ImmutableSet upgradedSegments; + + public SegmentsAndCommitMetadata( + List segments, + Object commitMetadata + ) + { + this(segments, commitMetadata, null, null); + } + + public SegmentsAndCommitMetadata( + List segments, + Object commitMetadata, + SegmentSchemaMapping segmentSchemaMapping + ) + { + this(segments, commitMetadata, segmentSchemaMapping, null); + } + public SegmentsAndCommitMetadata( List segments, @Nullable Object commitMetadata, - @Nullable SegmentSchemaMapping segmentSchemaMapping + @Nullable SegmentSchemaMapping segmentSchemaMapping, + @Nullable Set upgradedSegments ) { this.segments = ImmutableList.copyOf(segments); this.commitMetadata = commitMetadata; + this.upgradedSegments = upgradedSegments == null ? null : ImmutableSet.copyOf(upgradedSegments); this.segmentSchemaMapping = segmentSchemaMapping; } + public SegmentsAndCommitMetadata withUpgradedSegments(Set upgradedSegments) + { + return new SegmentsAndCommitMetadata( + this.segments, + this.commitMetadata, + this.segmentSchemaMapping, + upgradedSegments + ); + } + @Nullable public Object getCommitMetadata() { @@ -59,6 +93,15 @@ public List getSegments() return segments; } + /** + * @return the set of extra upgraded segments committed due to a concurrent replace. + */ + @Nullable + public Set getUpgradedSegments() + { + return upgradedSegments; + } + public SegmentSchemaMapping getSegmentSchemaMapping() { return segmentSchemaMapping; @@ -75,13 +118,15 @@ public boolean equals(Object o) } SegmentsAndCommitMetadata that = (SegmentsAndCommitMetadata) o; return Objects.equals(commitMetadata, that.commitMetadata) && + Objects.equals(upgradedSegments, that.upgradedSegments) && + Objects.equals(segmentSchemaMapping, that.segmentSchemaMapping) && Objects.equals(segments, that.segments); } @Override public int hashCode() { - return Objects.hash(commitMetadata, segments); + return Objects.hash(commitMetadata, segments, upgradedSegments, segmentSchemaMapping); } @Override @@ -90,6 +135,8 @@ public String toString() return getClass().getSimpleName() + "{" + "commitMetadata=" + commitMetadata + ", segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + + ", upgradedSegments=" + SegmentUtils.commaSeparatedIdentifiers(upgradedSegments) + + ", segmentSchemaMapping=" + segmentSchemaMapping + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index d02e200cfcbd..aba071de1dfc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -356,13 +356,13 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final } } - public void registerNewVersionOfPendingSegment( + public void registerUpgradedPendingSegment( SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion + SegmentIdWithShardSpec upgradedPendingSegment ) { newIdToBasePendingSegment.put( - newSegmentVersion.asSegmentId().toDescriptor(), + upgradedPendingSegment.asSegmentId().toDescriptor(), basePendingSegment.asSegmentId().toDescriptor() ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 35ff42d3daba..1c5dd42dd770 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -51,6 +51,7 @@ import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QuerySegmentWalker; @@ -147,6 +148,7 @@ public class StreamAppenderator implements Appenderator * of any thread from {@link #drop}. */ private final ConcurrentMap sinks = new ConcurrentHashMap<>(); + private final ConcurrentMap idToPendingSegment = new ConcurrentHashMap<>(); private final Set droppingSinks = Sets.newConcurrentHashSet(); private final VersionedIntervalTimeline sinkTimeline; private final long maxBytesTuningConfig; @@ -166,8 +168,25 @@ public class StreamAppenderator implements Appenderator private final AtomicBoolean closed = new AtomicBoolean(false); - private final ConcurrentHashMap> - baseSegmentToUpgradedVersions = new ConcurrentHashMap<>(); + /** + * Map from base segment identifier of a sink to the set of all the segment ids associated with it. + * The set contains the base segment itself and its upgraded versions announced as a result of a concurrent replace. + * The map contains all the available sinks' identifiers in its keyset. + */ + private final ConcurrentMap> baseSegmentToUpgradedSegments + = new ConcurrentHashMap<>(); + /** + * Map from the id of an upgraded pending segment to the segment corresponding to its upgradedFromSegmentId. + */ + private final ConcurrentMap upgradedSegmentToBaseSegment + = new ConcurrentHashMap<>(); + /** + * Set of all segment identifiers that have been marked to be abandoned. + * This is used to determine if all the segments corresponding to a sink have been abandoned and it can be dropped. + */ + private final ConcurrentHashMap.KeySetView abandonedSegments + = ConcurrentHashMap.newKeySet(); + private final SinkSchemaAnnouncer sinkSchemaAnnouncer; private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig; @@ -527,9 +546,7 @@ private Sink getOrCreateSink(final SegmentIdWithShardSpec identifier) .emit(); } - sinks.put(identifier, retVal); - metrics.setSinkCount(sinks.size()); - sinkTimeline.add(retVal.getInterval(), retVal.getVersion(), identifier.getShardSpec().createChunk(retVal)); + addSink(identifier, retVal); } return retVal; @@ -1058,14 +1075,7 @@ public void closeNow() log.debug("Shutting down immediately..."); for (Map.Entry entry : sinks.entrySet()) { - try { - unannounceAllVersionsOfSegment(entry.getValue().getSegment()); - } - catch (Exception e) { - log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource()) - .addData("identifier", entry.getKey().toString()) - .emit(); - } + unannounceAllVersionsOfSegment(entry.getValue().getSegment(), entry.getValue()); } try { shutdownExecutors(); @@ -1098,61 +1108,78 @@ SinkSchemaAnnouncer getSinkSchemaAnnouncer() /** * Unannounces the given base segment and all its upgraded versions. */ - private void unannounceAllVersionsOfSegment(DataSegment baseSegment) throws IOException + private void unannounceAllVersionsOfSegment(DataSegment baseSegment, Sink sink) { - segmentAnnouncer.unannounceSegment(baseSegment); + synchronized (sink) { + final SegmentIdWithShardSpec baseId = SegmentIdWithShardSpec.fromDataSegment(baseSegment); + if (!baseSegmentToUpgradedSegments.containsKey(baseId)) { + return; + } - final Set upgradedVersionsOfSegment - = baseSegmentToUpgradedVersions.remove(baseSegment.getId()); - if (upgradedVersionsOfSegment == null || upgradedVersionsOfSegment.isEmpty()) { - return; + final Set upgradedVersionsOfSegment = baseSegmentToUpgradedSegments.remove(baseId); + for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) { + final DataSegment newSegment = new DataSegment( + newId.getDataSource(), + newId.getInterval(), + newId.getVersion(), + baseSegment.getLoadSpec(), + baseSegment.getDimensions(), + baseSegment.getMetrics(), + newId.getShardSpec(), + baseSegment.getBinaryVersion(), + baseSegment.getSize() + ); + unannounceSegment(newSegment); + upgradedSegmentToBaseSegment.remove(newId); + } } + } - for (SegmentIdWithShardSpec newId : upgradedVersionsOfSegment) { - final DataSegment newSegment = new DataSegment( - newId.getDataSource(), - newId.getInterval(), - newId.getVersion(), - baseSegment.getLoadSpec(), - baseSegment.getDimensions(), - baseSegment.getMetrics(), - newId.getShardSpec(), - baseSegment.getBinaryVersion(), - baseSegment.getSize() - ); - segmentAnnouncer.unannounceSegment(newSegment); + private void unannounceSegment(DataSegment segment) + { + try { + segmentAnnouncer.unannounceSegment(segment); + } + catch (Exception e) { + log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource()) + .addData("identifier", segment.getId().toString()) + .emit(); } } - public void registerNewVersionOfPendingSegment( - SegmentIdWithShardSpec basePendingSegment, - SegmentIdWithShardSpec newSegmentVersion - ) throws IOException + public void registerUpgradedPendingSegment(PendingSegmentRecord pendingSegmentRecord) throws IOException { + SegmentIdWithShardSpec basePendingSegment = idToPendingSegment.get(pendingSegmentRecord.getUpgradedFromSegmentId()); + SegmentIdWithShardSpec upgradedPendingSegment = pendingSegmentRecord.getId(); if (!sinks.containsKey(basePendingSegment) || droppingSinks.contains(basePendingSegment)) { return; } // Update query mapping with SinkQuerySegmentWalker - ((SinkQuerySegmentWalker) texasRanger).registerNewVersionOfPendingSegment(basePendingSegment, newSegmentVersion); + ((SinkQuerySegmentWalker) texasRanger).registerUpgradedPendingSegment(basePendingSegment, upgradedPendingSegment); // Announce segments final DataSegment baseSegment = sinks.get(basePendingSegment).getSegment(); + final DataSegment newSegment = getUpgradedSegment(baseSegment, upgradedPendingSegment); - final DataSegment newSegment = new DataSegment( - newSegmentVersion.getDataSource(), - newSegmentVersion.getInterval(), - newSegmentVersion.getVersion(), + segmentAnnouncer.announceSegment(newSegment); + baseSegmentToUpgradedSegments.get(basePendingSegment).add(upgradedPendingSegment); + upgradedSegmentToBaseSegment.put(upgradedPendingSegment, basePendingSegment); + } + + private DataSegment getUpgradedSegment(DataSegment baseSegment, SegmentIdWithShardSpec upgradedVersion) + { + return new DataSegment( + upgradedVersion.getDataSource(), + upgradedVersion.getInterval(), + upgradedVersion.getVersion(), baseSegment.getLoadSpec(), baseSegment.getDimensions(), baseSegment.getMetrics(), - newSegmentVersion.getShardSpec(), + upgradedVersion.getShardSpec(), baseSegment.getBinaryVersion(), baseSegment.getSize() ); - segmentAnnouncer.announceSegment(newSegment); - baseSegmentToUpgradedVersions.computeIfAbsent(basePendingSegment.asSegmentId(), id -> new HashSet<>()) - .add(newSegmentVersion); } private void lockBasePersistDirectory() @@ -1367,13 +1394,8 @@ private Object bootstrapSinksFromDisk() hydrants ); rowsSoFar += currSink.getNumRows(); - sinks.put(identifier, currSink); - sinkTimeline.add( - currSink.getInterval(), - currSink.getVersion(), - identifier.getShardSpec().createChunk(currSink) - ); + addSink(identifier, currSink); segmentAnnouncer.announceSegment(currSink.getSegment()); } catch (IOException e) { @@ -1396,12 +1418,49 @@ private Object bootstrapSinksFromDisk() return committed.getMetadata(); } + /** + * Update the state of the appenderator when adding a sink. + * + * @param identifier sink identifier + * @param sink sink to be added + */ + private void addSink(SegmentIdWithShardSpec identifier, Sink sink) + { + sinks.put(identifier, sink); + // Asoociate the base segment of a sink with its string identifier + // Needed to get the base segment using upgradedFromSegmentId of a pending segment + idToPendingSegment.put(identifier.asSegmentId().toString(), identifier); + + // The base segment is associated with itself in the maps to maintain all the upgraded ids of a sink. + baseSegmentToUpgradedSegments.put(identifier, new HashSet<>()); + baseSegmentToUpgradedSegments.get(identifier).add(identifier); + + sinkTimeline.add( + sink.getInterval(), + sink.getVersion(), + identifier.getShardSpec().createChunk(sink) + ); + } + private ListenableFuture abandonSegment( final SegmentIdWithShardSpec identifier, final Sink sink, final boolean removeOnDiskData ) { + abandonedSegments.add(identifier); + final SegmentIdWithShardSpec baseIdentifier = upgradedSegmentToBaseSegment.getOrDefault(identifier, identifier); + synchronized (sink) { + if (baseSegmentToUpgradedSegments.containsKey(baseIdentifier)) { + Set relevantSegments = new HashSet<>(baseSegmentToUpgradedSegments.get(baseIdentifier)); + relevantSegments.removeAll(abandonedSegments); + // If there are unabandoned segments associated with the sink, return early + // This may be the case if segments have been upgraded as the result of a concurrent replace + if (!relevantSegments.isEmpty()) { + return Futures.immediateFuture(null); + } + } + } // Ensure no future writes will be made to this sink. if (sink.finishWriting()) { // Decrement this sink's rows from the counters. we only count active sinks so that we don't double decrement, @@ -1419,7 +1478,7 @@ private ListenableFuture abandonSegment( } // Mark this identifier as dropping, so no future push tasks will pick it up. - droppingSinks.add(identifier); + droppingSinks.add(baseIdentifier); // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread. return Futures.transform( @@ -1430,8 +1489,8 @@ private ListenableFuture abandonSegment( @Override public Void apply(@Nullable Object input) { - if (!sinks.remove(identifier, sink)) { - log.error("Sink for segment[%s] no longer valid, not abandoning.", identifier); + if (!sinks.remove(baseIdentifier, sink)) { + log.error("Sink for segment[%s] no longer valid, not abandoning.", baseIdentifier); return null; } @@ -1439,17 +1498,17 @@ public Void apply(@Nullable Object input) if (removeOnDiskData) { // Remove this segment from the committed list. This must be done from the persist thread. - log.debug("Removing commit metadata for segment[%s].", identifier); + log.debug("Removing commit metadata for segment[%s].", baseIdentifier); try { commitLock.lock(); final Committed oldCommit = readCommit(); if (oldCommit != null) { - writeCommit(oldCommit.without(identifier.toString())); + writeCommit(oldCommit.without(baseIdentifier.toString())); } } catch (Exception e) { log.makeAlert(e, "Failed to update committed segments[%s]", schema.getDataSource()) - .addData("identifier", identifier.toString()) + .addData("identifier", baseIdentifier.toString()) .emit(); throw new RuntimeException(e); } @@ -1458,22 +1517,14 @@ public Void apply(@Nullable Object input) } } - // Unannounce the segment. - try { - unannounceAllVersionsOfSegment(sink.getSegment()); - } - catch (Exception e) { - log.makeAlert(e, "Failed to unannounce segment[%s]", schema.getDataSource()) - .addData("identifier", identifier.toString()) - .emit(); - } + unannounceAllVersionsOfSegment(sink.getSegment(), sink); Runnable removeRunnable = () -> { - droppingSinks.remove(identifier); + droppingSinks.remove(baseIdentifier); sinkTimeline.remove( sink.getInterval(), sink.getVersion(), - identifier.getShardSpec().createChunk(sink) + baseIdentifier.getShardSpec().createChunk(sink) ); for (FireHydrant hydrant : sink) { if (cache != null) { @@ -1483,7 +1534,7 @@ public Void apply(@Nullable Object input) } if (removeOnDiskData) { - removeDirectory(computePersistDir(identifier)); + removeDirectory(computePersistDir(baseIdentifier)); } log.info("Dropped segment[%s].", identifier); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 164b81b0c49b..2b5c153d602e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -51,10 +51,12 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -322,10 +324,14 @@ public ListenableFuture registerHandoff(SegmentsAndCo return Futures.immediateFuture(null); } else { - final List waitingSegmentIdList = segmentsAndCommitMetadata.getSegments().stream() - .map( - SegmentIdWithShardSpec::fromDataSegment) - .collect(Collectors.toList()); + final Set segmentsToBeHandedOff = new HashSet<>(segmentsAndCommitMetadata.getSegments()); + if (segmentsAndCommitMetadata.getUpgradedSegments() != null) { + segmentsToBeHandedOff.addAll(segmentsAndCommitMetadata.getUpgradedSegments()); + } + final List waitingSegmentIdList = + segmentsToBeHandedOff.stream() + .map(SegmentIdWithShardSpec::fromDataSegment) + .collect(Collectors.toList()); final Object metadata = Preconditions.checkNotNull(segmentsAndCommitMetadata.getCommitMetadata(), "commitMetadata"); if (waitingSegmentIdList.isEmpty()) { @@ -333,7 +339,8 @@ public ListenableFuture registerHandoff(SegmentsAndCo new SegmentsAndCommitMetadata( segmentsAndCommitMetadata.getSegments(), ((AppenderatorDriverMetadata) metadata).getCallerMetadata(), - segmentsAndCommitMetadata.getSegmentSchemaMapping() + segmentsAndCommitMetadata.getSegmentSchemaMapping(), + segmentsAndCommitMetadata.getUpgradedSegments() ) ); } @@ -365,8 +372,7 @@ public ListenableFuture registerHandoff(SegmentsAndCo public void onSuccess(Object result) { if (numRemainingHandoffSegments.decrementAndGet() == 0) { - List segments = segmentsAndCommitMetadata.getSegments(); - log.info("Successfully handed off [%d] segments.", segments.size()); + log.info("Successfully handed off [%d] segments.", segmentsToBeHandedOff.size()); final long handoffTotalTime = System.currentTimeMillis() - handoffStartTime; metrics.reportMaxSegmentHandoffTime(handoffTotalTime); if (handoffTotalTime > HANDOFF_TIME_THRESHOLD) { @@ -375,9 +381,10 @@ public void onSuccess(Object result) } resultFuture.set( new SegmentsAndCommitMetadata( - segments, + segmentsAndCommitMetadata.getSegments(), ((AppenderatorDriverMetadata) metadata).getCallerMetadata(), - segmentsAndCommitMetadata.getSegmentSchemaMapping() + segmentsAndCommitMetadata.getSegmentSchemaMapping(), + segmentsAndCommitMetadata.getUpgradedSegments() ) ); } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index eb8f9358cef8..5a21a4331fe2 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -201,7 +201,7 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo expectedException.expect(ExecutionException.class); expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); expectedException.expectMessage( - "Fail test while dropping segment[foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_abc123]" + "Fail test while dropping segment" ); driver = new StreamAppenderatorDriver( @@ -221,10 +221,8 @@ public void testFailDuringDrop() throws IOException, InterruptedException, Timeo Assert.assertNull(driver.startJob(null)); - for (int i = 0; i < ROWS.size(); i++) { - committerSupplier.setMetadata(i + 1); - Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); - } + committerSupplier.setMetadata(1); + Assert.assertTrue(driver.add(ROWS.get(0), "dummy", committerSupplier, false, true).isOk()); final SegmentsAndCommitMetadata published = driver.publish( StreamAppenderatorDriverTest.makeOkPublisher(), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 335d1b219fe2..63775e2dc3bb 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -43,6 +43,7 @@ import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -58,6 +59,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,6 +74,7 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport { private static final String DATA_SOURCE = "foo"; private static final String VERSION = "abc123"; + private static final String UPGRADED_VERSION = "xyz456"; private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper(); private static final int MAX_ROWS_IN_MEMORY = 100; private static final int MAX_ROWS_PER_SEGMENT = 3; @@ -246,6 +249,44 @@ public void testHandoffTimeout() throws Exception driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); } + @Test + public void testHandoffUpgradedSegments() + throws IOException, InterruptedException, TimeoutException, ExecutionException + { + final TestCommitterSupplier committerSupplier = new TestCommitterSupplier<>(); + + Assert.assertNull(driver.startJob(null)); + + for (int i = 0; i < ROWS.size(); i++) { + committerSupplier.setMetadata(i + 1); + Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier, false, true).isOk()); + } + + driver.persist(committerSupplier.get()); + + // There is no remaining rows in the driver, and thus the result must be empty + final SegmentsAndCommitMetadata segmentsAndCommitMetadata = driver.publishAndRegisterHandoff( + makeUpgradingPublisher(), + committerSupplier.get(), + ImmutableList.of("dummy") + ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + + Assert.assertNotNull(segmentsAndCommitMetadata.getUpgradedSegments()); + Assert.assertEquals( + segmentsAndCommitMetadata.getSegments().size(), + segmentsAndCommitMetadata.getUpgradedSegments().size() + ); + + Set expectedHandedOffSegments = new HashSet<>(); + for (DataSegment segment : segmentsAndCommitMetadata.getSegments()) { + expectedHandedOffSegments.add(segment.toDescriptor()); + } + for (DataSegment segment : segmentsAndCommitMetadata.getUpgradedSegments()) { + expectedHandedOffSegments.add(segment.toDescriptor()); + } + Assert.assertEquals(expectedHandedOffSegments, segmentHandoffNotifierFactory.getHandedOffSegmentDescriptors()); + } + @Test public void testPublishPerRow() throws IOException, InterruptedException, TimeoutException, ExecutionException { @@ -379,6 +420,29 @@ static TransactionalSegmentPublisher makeOkPublisher() SegmentPublishResult.ok(Collections.emptySet()); } + private TransactionalSegmentPublisher makeUpgradingPublisher() + { + return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> { + Set allSegments = new HashSet<>(segmentsToPublish); + int id = 0; + for (DataSegment segment : segmentsToPublish) { + DataSegment upgradedSegment = new DataSegment( + SegmentId.of(DATA_SOURCE, Intervals.ETERNITY, UPGRADED_VERSION, id), + segment.getLoadSpec(), + segment.getDimensions(), + segment.getMetrics(), + new NumberedShardSpec(id, 0), + null, + segment.getBinaryVersion(), + segment.getSize() + ); + id++; + allSegments.add(upgradedSegment); + } + return SegmentPublishResult.ok(allSegments); + }; + } + static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) { return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> { @@ -459,6 +523,7 @@ static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifier { private boolean handoffEnabled = true; private long handoffDelay; + private final Set handedOffSegmentDescriptors = new HashSet<>(); public void disableHandoff() { @@ -470,6 +535,13 @@ public void setHandoffDelay(long delay) handoffDelay = delay; } + public Set getHandedOffSegmentDescriptors() + { + synchronized (handedOffSegmentDescriptors) { + return ImmutableSet.copyOf(handedOffSegmentDescriptors); + } + } + @Override public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource) { @@ -494,6 +566,9 @@ public boolean registerSegmentHandoffCallback( } exec.execute(handOffRunnable); + synchronized (handedOffSegmentDescriptors) { + handedOffSegmentDescriptors.add(descriptor); + } } return true; }