-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Add support for concurrent batch Append and Replace #14407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d9563f2
35cc335
4d01445
5acf93b
be23936
6a9e6e7
e3e9cf3
2ed61fd
0d9b5e6
38a0071
f5144a0
1413a30
468e4a2
d495c3c
671c01c
311d0ca
5e4876b
a2732ca
8f1e165
83d9484
06cf8d3
5981130
073bc26
12534ff
b271f1f
725265c
41c3cbe
1fabacf
5963be3
9fd156c
71023ca
a1c22a8
7b0e259
258caed
2fd2b9e
55eca90
1ba0e8d
ff71674
4e0587f
359a923
2bb3b79
ab0b400
8bb5a13
ae5e7c4
9c7d5b2
17ab844
e2b04d4
37640b7
5f5c5bf
444cfe4
6309702
f5b7092
58433d0
a88da61
057252e
7df51f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,140 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.druid.indexing.common.actions; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import com.fasterxml.jackson.core.type.TypeReference; | ||
| import org.apache.druid.indexing.common.task.IndexTaskUtils; | ||
| import org.apache.druid.indexing.common.task.Task; | ||
| import org.apache.druid.indexing.overlord.CriticalAction; | ||
| import org.apache.druid.indexing.overlord.SegmentPublishResult; | ||
| import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; | ||
| import org.apache.druid.metadata.ReplaceTaskLock; | ||
| import org.apache.druid.segment.SegmentUtils; | ||
| import org.apache.druid.timeline.DataSegment; | ||
|
|
||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| /** | ||
| * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by | ||
| * your task for the segment intervals. | ||
| */ | ||
| public class SegmentTransactionalAppendAction implements TaskAction<SegmentPublishResult> | ||
| { | ||
| private final Set<DataSegment> segments; | ||
|
|
||
| public static SegmentTransactionalAppendAction create(Set<DataSegment> segments) | ||
| { | ||
| return new SegmentTransactionalAppendAction(segments); | ||
| } | ||
|
|
||
| @JsonCreator | ||
| private SegmentTransactionalAppendAction( | ||
| @JsonProperty("segments") Set<DataSegment> segments | ||
| ) | ||
| { | ||
| this.segments = segments; | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public Set<DataSegment> getSegments() | ||
| { | ||
| return segments; | ||
| } | ||
|
|
||
| @Override | ||
| public TypeReference<SegmentPublishResult> getReturnTypeReference() | ||
| { | ||
| return new TypeReference<SegmentPublishResult>() | ||
| { | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Performs some sanity checks and publishes the given segments. | ||
| */ | ||
| @Override | ||
| public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) | ||
| { | ||
| TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); | ||
|
|
||
| final String datasource = task.getDataSource(); | ||
| final Map<DataSegment, ReplaceTaskLock> segmentToReplaceLock | ||
| = TaskLocks.findReplaceLocksCoveringSegments(datasource, toolbox.getTaskLockbox(), segments); | ||
|
|
||
| final SegmentPublishResult retVal; | ||
| try { | ||
| retVal = toolbox.getTaskLockbox().doInCriticalSection( | ||
| task, | ||
| segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), | ||
| CriticalAction.<SegmentPublishResult>builder() | ||
| .onValidLocks( | ||
| () -> toolbox.getIndexerMetadataStorageCoordinator().commitAppendSegments( | ||
| segments, | ||
| segmentToReplaceLock | ||
| ) | ||
| ) | ||
| .onInvalidLocks( | ||
| () -> SegmentPublishResult.fail( | ||
| "Invalid task locks. Maybe they are revoked by a higher priority task." | ||
| + " Please check the overlord log for details." | ||
| ) | ||
|
Comment on lines
+98
to
+101
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wish we could get a better error message than this... Ah well
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should have logged the intervals though. |
||
| ) | ||
| .build() | ||
| ); | ||
| } | ||
| catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
|
|
||
| // Emit metrics | ||
| final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); | ||
| IndexTaskUtils.setTaskDimensions(metricBuilder, task); | ||
|
|
||
| if (retVal.isSuccess()) { | ||
| toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1)); | ||
| for (DataSegment segment : retVal.getSegments()) { | ||
| IndexTaskUtils.setSegmentDimensions(metricBuilder, segment); | ||
| toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize())); | ||
| } | ||
| } else { | ||
| toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1)); | ||
| } | ||
|
|
||
| return retVal; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isAudited() | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() | ||
| { | ||
| return "SegmentTransactionalAppendAction{" + | ||
| "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + | ||
| '}'; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,147 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| package org.apache.druid.indexing.common.actions; | ||
|
|
||
| import com.fasterxml.jackson.annotation.JsonCreator; | ||
| import com.fasterxml.jackson.annotation.JsonProperty; | ||
| import com.fasterxml.jackson.core.type.TypeReference; | ||
| import com.google.common.collect.ImmutableSet; | ||
| import org.apache.druid.indexing.common.task.IndexTaskUtils; | ||
| import org.apache.druid.indexing.common.task.Task; | ||
| import org.apache.druid.indexing.overlord.CriticalAction; | ||
| import org.apache.druid.indexing.overlord.SegmentPublishResult; | ||
| import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; | ||
| import org.apache.druid.metadata.ReplaceTaskLock; | ||
| import org.apache.druid.query.DruidMetrics; | ||
| import org.apache.druid.segment.SegmentUtils; | ||
| import org.apache.druid.timeline.DataSegment; | ||
|
|
||
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| /** | ||
| * Replace segments in metadata storage. The segment versions must all be less than or equal to a lock held by | ||
| * your task for the segment intervals. | ||
|
Comment on lines
+40
to
+41
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this javadoc is not very clear. versions of "what" segments? The ones being replaced? Also what does it mean here by "your" task. Some verbosity here could be helpful. |
||
| */ | ||
| public class SegmentTransactionalReplaceAction implements TaskAction<SegmentPublishResult> | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's a bunch of stuff that I see and commented on in |
||
| { | ||
| /** | ||
| * Set of segments to be inserted into metadata storage | ||
| */ | ||
| private final Set<DataSegment> segments; | ||
|
|
||
| public static SegmentTransactionalReplaceAction create( | ||
| Set<DataSegment> segmentsToPublish | ||
| ) | ||
| { | ||
| return new SegmentTransactionalReplaceAction(segmentsToPublish); | ||
| } | ||
|
|
||
| @JsonCreator | ||
| private SegmentTransactionalReplaceAction( | ||
| @JsonProperty("segments") Set<DataSegment> segments | ||
| ) | ||
| { | ||
| this.segments = ImmutableSet.copyOf(segments); | ||
| } | ||
|
|
||
| @JsonProperty | ||
| public Set<DataSegment> getSegments() | ||
| { | ||
| return segments; | ||
| } | ||
|
|
||
| @Override | ||
| public TypeReference<SegmentPublishResult> getReturnTypeReference() | ||
| { | ||
| return new TypeReference<SegmentPublishResult>() | ||
| { | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Performs some sanity checks and publishes the given segments. | ||
| */ | ||
| @Override | ||
| public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) | ||
| { | ||
| TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); | ||
|
|
||
| // Find the active replace locks held only by this task | ||
| final Set<ReplaceTaskLock> replaceLocksForTask | ||
| = toolbox.getTaskLockbox().findReplaceLocksForTask(task); | ||
|
|
||
| final SegmentPublishResult retVal; | ||
| try { | ||
| retVal = toolbox.getTaskLockbox().doInCriticalSection( | ||
| task, | ||
| segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), | ||
| CriticalAction.<SegmentPublishResult>builder() | ||
| .onValidLocks( | ||
| () -> toolbox.getIndexerMetadataStorageCoordinator() | ||
| .commitReplaceSegments(segments, replaceLocksForTask) | ||
| ) | ||
| .onInvalidLocks( | ||
| () -> SegmentPublishResult.fail( | ||
| "Invalid task locks. Maybe they are revoked by a higher priority task." | ||
| + " Please check the overlord log for details." | ||
| ) | ||
| ) | ||
| .build() | ||
| ); | ||
| } | ||
| catch (Exception e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
|
|
||
| // Emit metrics | ||
| final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder(); | ||
| IndexTaskUtils.setTaskDimensions(metricBuilder, task); | ||
|
|
||
| if (retVal.isSuccess()) { | ||
| toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/success", 1)); | ||
|
|
||
| for (DataSegment segment : retVal.getSegments()) { | ||
| final String partitionType = segment.getShardSpec() == null ? null : segment.getShardSpec().getType(); | ||
| metricBuilder.setDimension(DruidMetrics.PARTITIONING_TYPE, partitionType); | ||
| metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); | ||
|
abhishekagarwal87 marked this conversation as resolved.
|
||
| toolbox.getEmitter().emit(metricBuilder.setMetric("segment/added/bytes", segment.getSize())); | ||
| } | ||
| } else { | ||
| toolbox.getEmitter().emit(metricBuilder.setMetric("segment/txn/failure", 1)); | ||
| } | ||
|
|
||
| return retVal; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean isAudited() | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() | ||
| { | ||
| return "SegmentTransactionalReplaceAction{" + | ||
| "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + | ||
| '}'; | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.