From 03404fa49820d50aa7595913f65dc1a486cffb84 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 19 Mar 2025 14:26:19 +0530 Subject: [PATCH 1/8] Retry segment publish task actions without holding locks --- .../common/actions/LocalTaskActionClient.java | 2 + .../common/actions/SegmentPublishAction.java | 85 ++++++++++ .../SegmentTransactionalAppendAction.java | 7 +- .../SegmentTransactionalInsertAction.java | 11 +- .../SegmentTransactionalReplaceAction.java | 12 +- .../common/actions/TaskActionHolder.java | 5 +- .../overlord/http/OverlordResource.java | 43 ++--- .../SegmentTransactionalInsertActionTest.java | 9 +- .../apache/druid/common/config/Configs.java | 24 +++ .../druid/common/config/ConfigsTest.java | 24 +++ .../overlord/SegmentPublishResult.java | 46 +++-- .../IndexerSQLMetadataStorageCoordinator.java | 158 ++++-------------- ...exerSQLMetadataStorageCoordinatorTest.java | 49 +++--- ...orageCoordinatorSchemaPersistenceTest.java | 2 +- 14 files changed, 260 insertions(+), 217 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java index 1d0059335ed1..af1f362a669b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.common.actions; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.jackson.JacksonUtils; @@ -69,6 +70,7 @@ private R performAction(TaskAction taskAction) return result; } catch (Throwable t) { + Throwables.throwIfUnchecked(t); throw new RuntimeException(t); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java new file mode 100644 index 000000000000..f346d814b635 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common.actions; + +import org.apache.druid.indexing.common.task.IndexTaskUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.StringUtils; + +/** + * Task action to publish segments. Contains common code used by insert, replace + * and append actions. + */ +public abstract class SegmentPublishAction implements TaskAction +{ + private static final int QUIET_RETRIES = 3; + private static final int MAX_RETRIES = 5; + + @Override + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + { + int attemptCount = 0; + final String taskId = task.getId(); + + // Retry until success or until max retries are exhausted + SegmentPublishResult result = tryPublishSegments(task, toolbox); + while (!result.isSuccess() && result.canRetry() && attemptCount++ < MAX_RETRIES) { + awaitNextRetry(taskId, result, attemptCount); + result = tryPublishSegments(task, toolbox); + } + + IndexTaskUtils.emitSegmentPublishMetrics(result, task, toolbox); + return result; + } + + /** + * Sleeps until the next attempt. + */ + private static void awaitNextRetry(String taskId, SegmentPublishResult lastResult, int attemptCount) + { + try { + RetryUtils.awaitNextRetry( + new ISE(lastResult.getErrorMsg()), + StringUtils.format( + "Segment publish for task[%s] failed due to error[%s]", + taskId, lastResult.getErrorMsg() + ), + attemptCount, + MAX_RETRIES, + attemptCount <= QUIET_RETRIES + ); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + /** + * Commits the segments provided in this task action if the underlying task + * holds the required locks over these segments. + */ + protected abstract SegmentPublishResult tryPublishSegments( + Task task, + TaskActionToolbox toolbox + ); +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index ea3a4fd36d2a..a206691c50c7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Throwables; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLock; @@ -58,7 +59,7 @@ * } * */ -public class SegmentTransactionalAppendAction implements TaskAction +public class SegmentTransactionalAppendAction extends SegmentPublishAction { private final Set segments; @Nullable @@ -136,7 +137,7 @@ public TypeReference getReturnTypeReference() } @Override - public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox toolbox) { if (!(task instanceof PendingSegmentAllocatingTask)) { throw DruidException.defensive( @@ -199,10 +200,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) ); } catch (Exception e) { + Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } - IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox); return retVal; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index fb988e78e5d4..627ab8e02d29 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -23,10 +23,10 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; -import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskLockHelper; import org.apache.druid.indexing.overlord.CriticalAction; @@ -51,7 +51,7 @@ * Insert segments into metadata storage. The segment versions must all be less than or equal to a lock held by * your task for the segment intervals. */ -public class SegmentTransactionalInsertAction implements TaskAction +public class SegmentTransactionalInsertAction extends SegmentPublishAction { /** * Set of segments that was fully overshadowed by new segments, {@link SegmentTransactionalInsertAction#segments} @@ -172,11 +172,8 @@ public TypeReference getReturnTypeReference() return new TypeReference<>() {}; } - /** - * Performs some sanity checks and publishes the given segments. - */ @Override - public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox toolbox) { final SegmentPublishResult retVal; @@ -234,10 +231,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) ); } catch (Exception e) { + Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } - IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox); return retVal; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 6546ed80d9d9..f64efa462249 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Optional; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; -import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.SegmentPublishResult; @@ -59,7 +59,7 @@ * The supervisor relays the payloads to all the tasks with the corresponding group_id to serve realtime queries * */ -public class SegmentTransactionalReplaceAction implements TaskAction +public class SegmentTransactionalReplaceAction extends SegmentPublishAction { private static final Logger log = new Logger(SegmentTransactionalReplaceAction.class); @@ -108,11 +108,8 @@ public TypeReference getReturnTypeReference() return new TypeReference<>() {}; } - /** - * Performs some sanity checks and publishes the given segments. - */ @Override - public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) + protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox toolbox) { TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); @@ -140,11 +137,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) ); } catch (Exception e) { + Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } - IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox); - // Upgrade any overlapping pending segments // Do not perform upgrade in the same transaction as replace commit so that // failure to upgrade pending segments does not affect success of the commit diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionHolder.java index 107a4bc7a404..8738a9044736 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionHolder.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; import org.apache.druid.indexing.common.task.Task; /** @@ -36,8 +37,8 @@ public TaskActionHolder( @JsonProperty("action") TaskAction action ) { - this.task = task; - this.action = action; + this.task = Configs.ensureNotNull(task, "'task' must not be null"); + this.action = Configs.ensureNotNull(action, "'action' must not be null"); } @JsonProperty diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 74656dfdb5f1..7cf52322bca9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -41,7 +41,6 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionHolder; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.DruidOverlord; @@ -97,7 +96,6 @@ import javax.ws.rs.core.Response.Status; import java.io.InputStream; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -509,30 +507,25 @@ public Response getWorkerConfigHistory( @ResourceFilters(StateResourceFilter.class) public Response doAction(final TaskActionHolder holder) { + final Task task = holder.getTask(); return asLeaderWith( - taskMaster.getTaskActionClient(holder.getTask()), - new Function<>() - { - @Override - public Response apply(TaskActionClient taskActionClient) - { - final Map retMap; - - // It would be great to verify that this worker is actually supposed to be running the task before - // actually doing the action. Some ideas for how that could be done would be using some sort of attempt_id - // or token that gets passed around. - - try { - final Object ret = taskActionClient.submit(holder.getAction()); - retMap = new HashMap<>(); - retMap.put("result", ret); - } - catch (Exception e) { - log.warn(e, "Failed to perform task action"); - return Response.serverError().entity(ImmutableMap.of("error", e.getMessage())).build(); - } - - return Response.ok().entity(retMap).build(); + taskMaster.getTaskActionClient(task), + taskActionClient -> { + try { + final Object result = taskActionClient.submit(holder.getAction()); + return Response.ok().entity(Map.of("result", result)).build(); + } + catch (DruidException e) { + log.noStackTrace().warn( + e, + "Exception while performing action[%s] for task[%s]", + holder.getAction(), task.getId() + ); + return ServletResourceUtils.buildErrorResponseFrom(e); + } + catch (Throwable e) { + log.warn(e, "Failed to perform action[%s] for task[%s]", holder.getAction(), task.getId()); + return Response.serverError().entity(ImmutableMap.of("error", e.getMessage())).build(); } } ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 44ce60b5ceb2..095e9c3b57d2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; @@ -150,11 +149,9 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception ); Assert.assertEquals( - SegmentPublishResult.fail( - InvalidInput.exception( - "The new start metadata state[ObjectMetadata{theObject=[1]}] is" - + " ahead of the last committed end state[null]. Try resetting the supervisor." - ).toString() + SegmentPublishResult.retryableFailure( + "The new start metadata state[ObjectMetadata{theObject=[1]}] is" + + " ahead of the last committed end state[null]. Try resetting the supervisor." ), result ); diff --git a/processing/src/main/java/org/apache/druid/common/config/Configs.java b/processing/src/main/java/org/apache/druid/common/config/Configs.java index 433a2548a4bf..020776d3f952 100644 --- a/processing/src/main/java/org/apache/druid/common/config/Configs.java +++ b/processing/src/main/java/org/apache/druid/common/config/Configs.java @@ -19,11 +19,35 @@ package org.apache.druid.common.config; +import org.apache.druid.error.InvalidInput; + +import javax.annotation.Nullable; + /** * Utility class for common config operations. */ public class Configs { + /** + * Ensures that the given value is not null, otherwise throws an exception. + * + * @param value Value to check for null + * @param message Message to use for the exception, if thrown + * @param args Arguments used to interpolate the exception message, if thrown + * @return The given value unchanged if it is not null + * @throws org.apache.druid.error.DruidException of + * {@link org.apache.druid.error.DruidException.Category#INVALID_INPUT} if the + * input {@code value} is null. + */ + public static T ensureNotNull(T value, String message, Object... args) + { + if (value == null) { + throw InvalidInput.exception(message, args); + } else { + return value; + } + } + /** * Returns the given {@code value} if it is not null, otherwise returns the * {@code defaultValue}. diff --git a/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java b/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java index edfc9a25164b..edbc15a2947a 100644 --- a/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java +++ b/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java @@ -19,6 +19,9 @@ package org.apache.druid.common.config; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; +import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; @@ -40,4 +43,25 @@ public void testValueOrDefault() Assert.assertEquals("def", Configs.valueOrDefault(null, "def")); } + @Test + public void testEnsureNotNull() + { + Assert.assertEquals( + "abc", + Configs.ensureNotNull("abc", "Config should not be null") + ); + } + + @Test + public void testEnsureNotNull_throwsException_ifValueIsNull() + { + MatcherAssert.assertThat( + Assert.assertThrows( + DruidException.class, + () -> Configs.ensureNotNull(null, "Config[%s] should not be null", "abc") + ), + DruidExceptionMatcher.invalidInput().expectMessageIs("Config[abc] should not be null") + ); + } + } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index e4bc1645f710..93198dd1a2d8 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -34,22 +34,14 @@ import java.util.Set; /** - * Result of an operation that attempts to publish segments. Indicates the set of segments actually published - * and whether or not the transaction was a success. - * - * If "success" is false then the segments set will be empty. - * - * It's possible for the segments set to be empty even if "success" is true, since the segments set only - * includes segments actually published as part of the transaction. The requested segments could have been - * published by a different transaction (e.g. in the case of replica sets) and this one would still succeed. + * Result of a segment publish operation. */ public class SegmentPublishResult { + private final boolean canRetry; private final Set segments; private final boolean success; - @Nullable private final String errorMsg; - @Nullable private final List upgradedPendingSegments; public static SegmentPublishResult ok(Set segments) @@ -59,12 +51,17 @@ public static SegmentPublishResult ok(Set segments) public static SegmentPublishResult ok(Set segments, List upgradedPendingSegments) { - return new SegmentPublishResult(segments, true, null, upgradedPendingSegments); + return new SegmentPublishResult(segments, true, false, null, upgradedPendingSegments); + } + + public static SegmentPublishResult fail(String errorMsg, Object... args) + { + return new SegmentPublishResult(Set.of(), false, false, StringUtils.format(errorMsg, args), null); } - public static SegmentPublishResult fail(String errorMsg) + public static SegmentPublishResult retryableFailure(String errorMsg, Object... args) { - return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg); + return new SegmentPublishResult(Set.of(), false, true, StringUtils.format(errorMsg, args), null); } @JsonCreator @@ -74,19 +71,21 @@ private SegmentPublishResult( @JsonProperty("errorMsg") @Nullable String errorMsg ) { - this(segments, success, errorMsg, null); + this(segments, success, false, errorMsg, null); } private SegmentPublishResult( Set segments, boolean success, - @Nullable String errorMsg, + boolean canRetry, + @Nullable String errorMsg, List upgradedPendingSegments ) { this.segments = Preconditions.checkNotNull(segments, "segments"); this.success = success; this.errorMsg = errorMsg; + this.canRetry = canRetry; this.upgradedPendingSegments = upgradedPendingSegments; if (!success) { @@ -98,6 +97,12 @@ private SegmentPublishResult( } } + /** + * Set of segments published successfully. + * + * @return Empty set if the publish operation failed or if all the segments had + * already been published by a different transaction. + */ @JsonProperty public Set getSegments() { @@ -117,6 +122,11 @@ public String getErrorMsg() return errorMsg; } + public boolean canRetry() + { + return canRetry; + } + @Nullable public List getUpgradedPendingSegments() { @@ -134,6 +144,7 @@ public boolean equals(Object o) } SegmentPublishResult that = (SegmentPublishResult) o; return success == that.success && + canRetry == that.canRetry && Objects.equals(segments, that.segments) && Objects.equals(errorMsg, that.errorMsg); } @@ -141,7 +152,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(segments, success, errorMsg); + return Objects.hash(segments, success, errorMsg, canRetry); } @Override @@ -150,6 +161,7 @@ public String toString() return "SegmentPublishResult{" + "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + ", success=" + success + + ", canRetry=" + canRetry + ", errorMsg='" + errorMsg + '\'' + '}'; } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index ce092f8c6967..9a0f91d6afd6 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -91,7 +91,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -306,33 +305,22 @@ public SegmentPublishResult commitSegmentsAndMetadata( final String dataSource = segments.iterator().next().getDataSource(); - final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false); - try { return inReadWriteDatasourceTransaction( dataSource, transaction -> { - // Set definitelyNotUpdated back to false upon retrying. - definitelyNotUpdated.set(false); - + // Try to update datasource metadata first if (startMetadata != null) { - final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle( + final SegmentPublishResult result = updateDataSourceMetadataWithHandle( transaction, dataSource, startMetadata, endMetadata ); - if (result.isFailed()) { - // Metadata was definitely not updated. - transaction.setRollbackOnly(); - definitelyNotUpdated.set(true); - - if (result.canRetry()) { - throw new RetryTransactionException(result.getErrorMsg()); - } else { - throw InvalidInput.exception(result.getErrorMsg()); - } + // Do not proceed if the datasource metadata update failed + if (!result.isSuccess()) { + return result; } } @@ -347,12 +335,7 @@ public SegmentPublishResult commitSegmentsAndMetadata( ); } catch (CallbackFailedException e) { - if (definitelyNotUpdated.get()) { - return SegmentPublishResult.fail(e.getMessage()); - } else { - // Must throw exception if we are not sure if we updated or not. - throw e; - } + throw e; } } @@ -468,45 +451,19 @@ public SegmentPublishResult commitMetadataOnly( throw new IllegalArgumentException("end metadata cannot be null"); } - final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false); - try { return inReadWriteDatasourceTransaction( dataSource, - transaction -> { - // Set definitelyNotUpdated back to false upon retrying. - definitelyNotUpdated.set(false); - - final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle( - transaction, - dataSource, - startMetadata, - endMetadata - ); - - if (result.isFailed()) { - // Metadata was definitely not updated. - transaction.setRollbackOnly(); - definitelyNotUpdated.set(true); - - if (result.canRetry()) { - throw new RetryTransactionException(result.getErrorMsg()); - } else { - throw new RuntimeException(result.getErrorMsg()); - } - } - - return SegmentPublishResult.ok(ImmutableSet.of()); - } + transaction -> updateDataSourceMetadataWithHandle( + transaction, + dataSource, + startMetadata, + endMetadata + ) ); } catch (CallbackFailedException e) { - if (definitelyNotUpdated.get()) { - return SegmentPublishResult.fail(e.getMessage()); - } else { - // Must throw exception if we are not sure if we updated or not. - throw e; - } + throw e; } } @@ -1126,25 +1083,18 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } ); - final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false); try { return inReadWriteDatasourceTransaction( dataSource, transaction -> { - metadataNotUpdated.set(false); - + // Try to update datasource metadata first if (startMetadata != null) { - final DataStoreMetadataUpdateResult metadataUpdateResult + final SegmentPublishResult metadataUpdateResult = updateDataSourceMetadataWithHandle(transaction, dataSource, startMetadata, endMetadata); - if (metadataUpdateResult.isFailed()) { - transaction.setRollbackOnly(); - metadataNotUpdated.set(true); - if (metadataUpdateResult.canRetry()) { - throw new RetryTransactionException(metadataUpdateResult.getErrorMsg()); - } else { - throw new RuntimeException(metadataUpdateResult.getErrorMsg()); - } + // Abort the transaction if datasource metadata update has failed + if (!metadataUpdateResult.isSuccess()) { + return metadataUpdateResult; } } @@ -1172,12 +1122,7 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( ); } catch (CallbackFailedException e) { - if (metadataNotUpdated.get()) { - // Return failed result if metadata was definitely not updated - return SegmentPublishResult.fail(e.getMessage()); - } else { - throw e; - } + throw e; } } @@ -2052,7 +1997,7 @@ private Map getAppendSegmentsCommittedDuringTask( * * @throws RuntimeException if state is unknown after this call */ - protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected SegmentPublishResult updateDataSourceMetadataWithHandle( final SegmentMetadataTransaction transaction, final String dataSource, final DataSourceMetadata startMetadata, @@ -2102,7 +2047,9 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) { // Offsets stored in startMetadata is greater than the last commited metadata. - return DataStoreMetadataUpdateResult.failure( + // This can happen because the previous task is still publishing its segments and can resolve once + // the previous task finishes publishing. + return SegmentPublishResult.retryableFailure( "The new start metadata state[%s] is ahead of the last committed" + " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb @@ -2111,7 +2058,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( if (!startMetadataMatchesExisting) { // Not in the desired start state. - return DataStoreMetadataUpdateResult.failure( + return SegmentPublishResult.fail( "Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.", oldCommitMetadataFromDb, startMetadata ); @@ -2126,7 +2073,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() ); - final DataStoreMetadataUpdateResult retVal; + final SegmentPublishResult retVal; if (oldCommitMetadataBytesFromDb == null) { // SELECT -> INSERT can fail due to races; callers must be prepared to retry. final int numRows = transaction.getHandle().createStatement( @@ -2143,8 +2090,8 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( .execute(); retVal = numRows == 1 - ? DataStoreMetadataUpdateResult.SUCCESS - : DataStoreMetadataUpdateResult.retryableFailure("Failed to insert metadata for datasource[%s]", dataSource); + ? SegmentPublishResult.ok(Set.of()) + : SegmentPublishResult.retryableFailure("Failed to insert metadata for datasource[%s]", dataSource); } else { // Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE final int numRows = transaction.getHandle().createStatement( @@ -2163,8 +2110,8 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( .execute(); retVal = numRows == 1 - ? DataStoreMetadataUpdateResult.SUCCESS - : DataStoreMetadataUpdateResult.retryableFailure("Failed to update metadata for datasource[%s]", dataSource); + ? SegmentPublishResult.ok(Set.of()) + : SegmentPublishResult.retryableFailure("Failed to update metadata for datasource[%s]", dataSource); } if (retVal.isSuccess()) { @@ -2521,51 +2468,4 @@ private T inReadOnlyDatasourceTransaction( { return transactionFactory.inReadOnlyDatasourceTransaction(dataSource, callback); } - - public static class DataStoreMetadataUpdateResult - { - private final boolean failed; - private final boolean canRetry; - private final String errorMsg; - - public static final DataStoreMetadataUpdateResult SUCCESS = new DataStoreMetadataUpdateResult(false, false, null); - - public static DataStoreMetadataUpdateResult failure(String errorMsgFormat, Object... messageArgs) - { - return new DataStoreMetadataUpdateResult(true, false, errorMsgFormat, messageArgs); - } - - public static DataStoreMetadataUpdateResult retryableFailure(String errorMsgFormat, Object... messageArgs) - { - return new DataStoreMetadataUpdateResult(true, true, errorMsgFormat, messageArgs); - } - - DataStoreMetadataUpdateResult(boolean failed, boolean canRetry, @Nullable String errorMsg, Object... errorFormatArgs) - { - this.failed = failed; - this.canRetry = canRetry; - this.errorMsg = null == errorMsg ? null : StringUtils.format(errorMsg, errorFormatArgs); - } - - public boolean isFailed() - { - return failed; - } - - public boolean isSuccess() - { - return !failed; - } - - public boolean canRetry() - { - return canRetry; - } - - @Nullable - public String getErrorMsg() - { - return errorMsg; - } - } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index b6578fa9a721..0212c8f2d998 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.data.input.StringTuple; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.overlord.SegmentCreateRequest; @@ -186,7 +185,7 @@ public int getMaxRetries() ) { @Override - protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected SegmentPublishResult updateDataSourceMetadataWithHandle( SegmentMetadataTransaction transaction, String dataSource, DataSourceMetadata startMetadata, @@ -780,7 +779,7 @@ public void testTransactionalAnnounceRetryAndSuccess() throws IOException ) { @Override - protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected SegmentPublishResult updateDataSourceMetadataWithHandle( SegmentMetadataTransaction transaction, String dataSource, DataSourceMetadata startMetadata, @@ -789,7 +788,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( { metadataUpdateCounter.getAndIncrement(); if (attemptCounter.getAndIncrement() == 0) { - return DataStoreMetadataUpdateResult.retryableFailure(null); + return SegmentPublishResult.retryableFailure("this failure can be retried"); } else { return super.updateDataSourceMetadataWithHandle(transaction, dataSource, startMetadata, endMetadata); } @@ -803,7 +802,15 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( new ObjectMetadata(ImmutableMap.of("foo", "bar")), new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) ); - Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1); + Assert.assertEquals(SegmentPublishResult.retryableFailure("this failure can be retried"), result1); + + final SegmentPublishResult resultOnRetry = failOnceCoordinator.commitSegmentsAndMetadata( + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) + ); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), resultOnRetry); Assert.assertArrayEquals( mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8), @@ -825,7 +832,15 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( new ObjectMetadata(ImmutableMap.of("foo", "baz")), new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) ); - Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), result2); + Assert.assertEquals(SegmentPublishResult.retryableFailure("this failure can be retried"), result2); + + final SegmentPublishResult resultOnRetry2 = failOnceCoordinator.commitSegmentsAndMetadata( + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")), + new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) + ); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), resultOnRetry2); Assert.assertArrayEquals( mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8), @@ -857,11 +872,10 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) ); Assert.assertEquals( - SegmentPublishResult.fail( - InvalidInput.exception( - "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed" - + " end state[null]. Try resetting the supervisor." - ).toString()), + SegmentPublishResult.retryableFailure( + "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed" + + " end state[null]. Try resetting the supervisor." + ), result1 ); @@ -888,10 +902,8 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() ); Assert.assertEquals( SegmentPublishResult.fail( - InvalidInput.exception( - "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]" - + " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor." - ).toString() + "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]" + + " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor." ), result2 ); @@ -972,10 +984,9 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() ); Assert.assertEquals( SegmentPublishResult.fail( - InvalidInput.exception( - "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and " - + "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor." - ).toString()), + "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and " + + "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor." + ), result2 ); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java index 9bed46b2c3a9..c72e1a75afee 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java @@ -108,7 +108,7 @@ public void setUp() ) { @Override - protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected SegmentPublishResult updateDataSourceMetadataWithHandle( SegmentMetadataTransaction transaction, String dataSource, DataSourceMetadata startMetadata, From f346537a8f4e871e50b74fa7ebb7f06538415968 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Wed, 19 Mar 2025 15:36:54 +0530 Subject: [PATCH 2/8] Remove unused imports --- .../actions/SegmentTransactionalAppendAction.java | 6 +----- .../actions/SegmentTransactionalInsertAction.java | 10 +++------- .../java/org/apache/druid/common/config/Configs.java | 2 -- 3 files changed, 4 insertions(+), 14 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index a206691c50c7..e12bce3c2d10 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -27,7 +27,6 @@ import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; @@ -183,9 +182,8 @@ protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox t ); } - final SegmentPublishResult retVal; try { - retVal = toolbox.getTaskLockbox().doInCriticalSection( + return toolbox.getTaskLockbox().doInCriticalSection( task, segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() @@ -203,8 +201,6 @@ protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox t Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } - - return retVal; } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 627ab8e02d29..3a159c659313 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -175,22 +175,20 @@ public TypeReference getReturnTypeReference() @Override protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox toolbox) { - final SegmentPublishResult retVal; - if (segments.isEmpty()) { // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), // but still needs to update metadata with the progress that the task made. try { - retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + return toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( dataSource, startMetadata, endMetadata ); } catch (Exception e) { + Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } - return retVal; } final Set allSegments = new HashSet<>(segments); @@ -209,7 +207,7 @@ protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox t } try { - retVal = toolbox.getTaskLockbox().doInCriticalSection( + return toolbox.getTaskLockbox().doInCriticalSection( task, allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() @@ -234,8 +232,6 @@ protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox t Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } - - return retVal; } private void checkWithSegmentLock() diff --git a/processing/src/main/java/org/apache/druid/common/config/Configs.java b/processing/src/main/java/org/apache/druid/common/config/Configs.java index 020776d3f952..b3966faf739f 100644 --- a/processing/src/main/java/org/apache/druid/common/config/Configs.java +++ b/processing/src/main/java/org/apache/druid/common/config/Configs.java @@ -21,8 +21,6 @@ import org.apache.druid.error.InvalidInput; -import javax.annotation.Nullable; - /** * Utility class for common config operations. */ From e648b0005ed3899ca6e5de221903b057522f1132 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 22 Mar 2025 12:07:11 +0530 Subject: [PATCH 3/8] Add retry in TransactionalSegmentPublisher --- .../common/actions/SegmentPublishAction.java | 2 +- .../common/task/AbstractBatchIndexTask.java | 25 ++++++++ .../druid/indexing/common/task/IndexTask.java | 7 +-- .../parallel/ParallelIndexSupervisorTask.java | 8 +-- .../seekablestream/SequenceMetadata.java | 2 +- .../apache/druid/common/config/Configs.java | 5 +- .../overlord/SegmentPublishResult.java | 22 ++++--- .../TransactionalSegmentPublisher.java | 61 +++++++++++++++++-- 8 files changed, 99 insertions(+), 33 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java index f346d814b635..3ce19858e1e8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java @@ -43,7 +43,7 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) // Retry until success or until max retries are exhausted SegmentPublishResult result = tryPublishSegments(task, toolbox); - while (!result.isSuccess() && result.canRetry() && attemptCount++ < MAX_RETRIES) { + while (!result.isSuccess() && result.isRetryable() && attemptCount++ < MAX_RETRIES) { awaitNextRetry(taskId, result, attemptCount); result = tryPublishSegments(task, toolbox); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 38bc961dc47a..ec9c12a14fe3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -76,6 +76,7 @@ import org.apache.druid.segment.indexing.IngestionSpec; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.segment.transform.CompactionTransformSpec; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -456,6 +457,30 @@ protected TaskAction buildPublishAction( } } + protected TransactionalSegmentPublisher buildSegmentPublisher(TaskToolbox toolbox) + { + return new TransactionalSegmentPublisher() + { + @Override + public SegmentPublishResult publishAnnotatedSegments( + @Nullable Set segmentsToBeOverwritten, + Set segmentsToPublish, + @Nullable Object commitMetadata, + @Nullable SegmentSchemaMapping schemaMapping + ) throws IOException + { + return toolbox.getTaskActionClient().submit( + buildPublishAction( + segmentsToBeOverwritten, + segmentsToPublish, + schemaMapping, + getTaskLockHelper().getLockTypeToUse() + ) + ); + } + }; + } + protected boolean tryTimeChunkLock(TaskActionClient client, List intervals) throws IOException { // The given intervals are first converted to align with segment granularity. This is because, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index eae1f7caf1e6..5b7ba5e944c2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -62,6 +62,7 @@ import org.apache.druid.indexing.common.task.batch.partition.PartitionAnalysis; import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.input.TaskInputSource; +import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -866,11 +867,7 @@ private TaskStatus generateAndPublishSegments( throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType()); } - final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse(); - final TransactionalSegmentPublisher publisher = - (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> toolbox.getTaskActionClient().submit( - buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, map, taskLockType) - ); + final TransactionalSegmentPublisher publisher = buildSegmentPublisher(toolbox); String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null); if (effectiveId == null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 86e31c74a72f..34cc71a3a127 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -45,7 +45,6 @@ import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexer.report.TaskReport; -import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; @@ -1191,12 +1190,7 @@ private void publishSegments( } } - final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse(); - final TransactionalSegmentPublisher publisher = - (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> toolbox.getTaskActionClient().submit( - buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, map, taskLockType) - ); - + final TransactionalSegmentPublisher publisher = buildSegmentPublisher(toolbox); final boolean published = newSegments.isEmpty() || publisher.publishSegments(oldSegments, newSegments, annotateFunction, null, segmentSchemaMapping).isSuccess(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index 2da858f80ccd..f974a1c6c932 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -331,7 +331,7 @@ TransactionalSegmentPublisher createPublisher( } private class SequenceMetadataTransactionalSegmentPublisher - implements TransactionalSegmentPublisher + extends TransactionalSegmentPublisher { private final SeekableStreamIndexTaskRunner runner; private final TaskToolbox toolbox; diff --git a/processing/src/main/java/org/apache/druid/common/config/Configs.java b/processing/src/main/java/org/apache/druid/common/config/Configs.java index b3966faf739f..7fb900601d18 100644 --- a/processing/src/main/java/org/apache/druid/common/config/Configs.java +++ b/processing/src/main/java/org/apache/druid/common/config/Configs.java @@ -33,9 +33,8 @@ public class Configs * @param message Message to use for the exception, if thrown * @param args Arguments used to interpolate the exception message, if thrown * @return The given value unchanged if it is not null - * @throws org.apache.druid.error.DruidException of - * {@link org.apache.druid.error.DruidException.Category#INVALID_INPUT} if the - * input {@code value} is null. + * @throws org.apache.druid.error.DruidException of type {@link InvalidInput} + * if the input {@code value} is null. */ public static T ensureNotNull(T value, String message, Object... args) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index 93198dd1a2d8..03b4f4bbb7ef 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -38,15 +38,15 @@ */ public class SegmentPublishResult { - private final boolean canRetry; private final Set segments; private final boolean success; + private final boolean retryable; private final String errorMsg; private final List upgradedPendingSegments; public static SegmentPublishResult ok(Set segments) { - return new SegmentPublishResult(segments, true, null); + return new SegmentPublishResult(segments, true, false, null); } public static SegmentPublishResult ok(Set segments, List upgradedPendingSegments) @@ -68,16 +68,17 @@ public static SegmentPublishResult retryableFailure(String errorMsg, Object... a private SegmentPublishResult( @JsonProperty("segments") Set segments, @JsonProperty("success") boolean success, + @JsonProperty("canRetry") boolean retryable, @JsonProperty("errorMsg") @Nullable String errorMsg ) { - this(segments, success, false, errorMsg, null); + this(segments, success, retryable, errorMsg, null); } private SegmentPublishResult( Set segments, boolean success, - boolean canRetry, + boolean retryable, @Nullable String errorMsg, List upgradedPendingSegments ) @@ -85,7 +86,7 @@ private SegmentPublishResult( this.segments = Preconditions.checkNotNull(segments, "segments"); this.success = success; this.errorMsg = errorMsg; - this.canRetry = canRetry; + this.retryable = retryable; this.upgradedPendingSegments = upgradedPendingSegments; if (!success) { @@ -122,9 +123,10 @@ public String getErrorMsg() return errorMsg; } - public boolean canRetry() + @JsonProperty + public boolean isRetryable() { - return canRetry; + return retryable; } @Nullable @@ -144,7 +146,7 @@ public boolean equals(Object o) } SegmentPublishResult that = (SegmentPublishResult) o; return success == that.success && - canRetry == that.canRetry && + retryable == that.retryable && Objects.equals(segments, that.segments) && Objects.equals(errorMsg, that.errorMsg); } @@ -152,7 +154,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(segments, success, errorMsg, canRetry); + return Objects.hash(segments, success, errorMsg, retryable); } @Override @@ -161,7 +163,7 @@ public String toString() return "SegmentPublishResult{" + "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + ", success=" + success + - ", canRetry=" + canRetry + + ", retryable=" + retryable + ", errorMsg='" + errorMsg + '\'' + '}'; } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index 390f423fdb5f..eb84b50399ad 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -20,6 +20,9 @@ package org.apache.druid.segment.realtime.appenderator; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.timeline.DataSegment; @@ -28,8 +31,11 @@ import java.util.Set; import java.util.function.Function; -public interface TransactionalSegmentPublisher +public abstract class TransactionalSegmentPublisher { + private static final int QUIET_RETRIES = 3; + private static final int MAX_RETRIES = 5; + /** * Publish segments, along with some commit metadata, in a single transaction. * @@ -40,14 +46,19 @@ public interface TransactionalSegmentPublisher * @throws IOException if there was an I/O error when publishing * @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason */ - SegmentPublishResult publishAnnotatedSegments( + public abstract SegmentPublishResult publishAnnotatedSegments( @Nullable Set segmentsToBeOverwritten, Set segmentsToPublish, @Nullable Object commitMetadata, @Nullable SegmentSchemaMapping segmentSchemaMapping ) throws IOException; - default SegmentPublishResult publishSegments( + /** + * Applies the given annotate function on the segments and tries to publish + * them. If the action fails with a retryable failure, it can be retried upto + * {@link #MAX_RETRIES} times. + */ + public final SegmentPublishResult publishSegments( @Nullable Set segmentsToBeOverwritten, Set segmentsToPublish, Function, Set> outputSegmentsAnnotateFunction, @@ -57,20 +68,58 @@ default SegmentPublishResult publishSegments( { final Function, Set> annotateFunction = outputSegmentsAnnotateFunction .andThen(SegmentPublisherHelper::annotateShardSpec); - return publishAnnotatedSegments( + final Set annotatedSegmentsToPublish = annotateFunction.apply(segmentsToPublish); + + int attemptCount = 0; + + // Retry until success or until max retries are exhausted + SegmentPublishResult result = publishAnnotatedSegments( segmentsToBeOverwritten, - annotateFunction.apply(segmentsToPublish), + annotatedSegmentsToPublish, commitMetadata, segmentSchemaMapping ); + while (!result.isSuccess() && result.isRetryable() && attemptCount++ < MAX_RETRIES) { + awaitNextRetry(result, attemptCount); + result = publishAnnotatedSegments( + segmentsToBeOverwritten, + annotatedSegmentsToPublish, + commitMetadata, + segmentSchemaMapping + ); + } + + return result; } /** * @return true if this publisher has action to take when publishing with an empty segment set. * The publisher used by the seekable stream tasks is an example where this is true. */ - default boolean supportsEmptyPublish() + public boolean supportsEmptyPublish() { return false; } + + /** + * Sleeps until the next attempt. + */ + private static void awaitNextRetry(SegmentPublishResult lastResult, int attemptCount) + { + try { + RetryUtils.awaitNextRetry( + new ISE(lastResult.getErrorMsg()), + StringUtils.format( + "Segment publish failed due to error[%s]", + lastResult.getErrorMsg() + ), + attemptCount, + MAX_RETRIES, + attemptCount <= QUIET_RETRIES + ); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } From 6cb009dbd4d0fdbc3ed5fb111f9b1e9ea82c9f08 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 22 Mar 2025 12:45:08 +0530 Subject: [PATCH 4/8] Add retry in TransactionalSegmentPublisher --- .../common/actions/SegmentPublishAction.java | 85 ------------------- .../SegmentTransactionalAppendAction.java | 11 ++- .../SegmentTransactionalInsertAction.java | 16 ++-- .../SegmentTransactionalReplaceAction.java | 6 +- .../druid/indexing/common/task/IndexTask.java | 2 - .../appenderator/BaseAppenderatorDriver.java | 3 +- .../BatchAppenderatorDriverTest.java | 18 +++- .../StreamAppenderatorDriverTest.java | 34 ++++++-- 8 files changed, 70 insertions(+), 105 deletions(-) delete mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java deleted file mode 100644 index 3ce19858e1e8..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentPublishAction.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import org.apache.druid.indexing.common.task.IndexTaskUtils; -import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.SegmentPublishResult; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.RetryUtils; -import org.apache.druid.java.util.common.StringUtils; - -/** - * Task action to publish segments. Contains common code used by insert, replace - * and append actions. - */ -public abstract class SegmentPublishAction implements TaskAction -{ - private static final int QUIET_RETRIES = 3; - private static final int MAX_RETRIES = 5; - - @Override - public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) - { - int attemptCount = 0; - final String taskId = task.getId(); - - // Retry until success or until max retries are exhausted - SegmentPublishResult result = tryPublishSegments(task, toolbox); - while (!result.isSuccess() && result.isRetryable() && attemptCount++ < MAX_RETRIES) { - awaitNextRetry(taskId, result, attemptCount); - result = tryPublishSegments(task, toolbox); - } - - IndexTaskUtils.emitSegmentPublishMetrics(result, task, toolbox); - return result; - } - - /** - * Sleeps until the next attempt. - */ - private static void awaitNextRetry(String taskId, SegmentPublishResult lastResult, int attemptCount) - { - try { - RetryUtils.awaitNextRetry( - new ISE(lastResult.getErrorMsg()), - StringUtils.format( - "Segment publish for task[%s] failed due to error[%s]", - taskId, lastResult.getErrorMsg() - ), - attemptCount, - MAX_RETRIES, - attemptCount <= QUIET_RETRIES - ); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - - /** - * Commits the segments provided in this task action if the underlying task - * holds the required locks over these segments. - */ - protected abstract SegmentPublishResult tryPublishSegments( - Task task, - TaskActionToolbox toolbox - ); -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index e12bce3c2d10..63feb0e4c9c4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -27,6 +27,7 @@ import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; @@ -58,7 +59,7 @@ * } * */ -public class SegmentTransactionalAppendAction extends SegmentPublishAction +public class SegmentTransactionalAppendAction implements TaskAction { private final Set segments; @Nullable @@ -136,7 +137,7 @@ public TypeReference getReturnTypeReference() } @Override - protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox toolbox) + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) { if (!(task instanceof PendingSegmentAllocatingTask)) { throw DruidException.defensive( @@ -182,8 +183,9 @@ protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox t ); } + final SegmentPublishResult retVal; try { - return toolbox.getTaskLockbox().doInCriticalSection( + retVal = toolbox.getTaskLockbox().doInCriticalSection( task, segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() @@ -201,6 +203,9 @@ protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox t Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } + + IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox); + return retVal; } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 3a159c659313..e06cb9b37849 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; +import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskLockHelper; import org.apache.druid.indexing.overlord.CriticalAction; @@ -51,7 +52,7 @@ * Insert segments into metadata storage. The segment versions must all be less than or equal to a lock held by * your task for the segment intervals. */ -public class SegmentTransactionalInsertAction extends SegmentPublishAction +public class SegmentTransactionalInsertAction implements TaskAction { /** * Set of segments that was fully overshadowed by new segments, {@link SegmentTransactionalInsertAction#segments} @@ -173,22 +174,24 @@ public TypeReference getReturnTypeReference() } @Override - protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox toolbox) + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) { + final SegmentPublishResult retVal; + if (segments.isEmpty()) { // A stream ingestion task didn't ingest any rows and created no segments (e.g., all records were unparseable), // but still needs to update metadata with the progress that the task made. try { - return toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( + retVal = toolbox.getIndexerMetadataStorageCoordinator().commitMetadataOnly( dataSource, startMetadata, endMetadata ); } catch (Exception e) { - Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } + return retVal; } final Set allSegments = new HashSet<>(segments); @@ -207,7 +210,7 @@ protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox t } try { - return toolbox.getTaskLockbox().doInCriticalSection( + retVal = toolbox.getTaskLockbox().doInCriticalSection( task, allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() @@ -232,6 +235,9 @@ protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox t Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } + + IndexTaskUtils.emitSegmentPublishMetrics(retVal, task, toolbox); + return retVal; } private void checkWithSegmentLock() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index f64efa462249..a44a55d13b9a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -25,6 +25,7 @@ import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.CriticalAction; import org.apache.druid.indexing.overlord.SegmentPublishResult; @@ -59,7 +60,7 @@ * The supervisor relays the payloads to all the tasks with the corresponding group_id to serve realtime queries * */ -public class SegmentTransactionalReplaceAction extends SegmentPublishAction +public class SegmentTransactionalReplaceAction implements TaskAction { private static final Logger log = new Logger(SegmentTransactionalReplaceAction.class); @@ -109,7 +110,7 @@ public TypeReference getReturnTypeReference() } @Override - protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox toolbox) + public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) { TaskLocks.checkLockCoversSegments(task, toolbox.getTaskLockbox(), segments); @@ -153,6 +154,7 @@ protected SegmentPublishResult tryPublishSegments(Task task, TaskActionToolbox t } } + IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox); return publishResult; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 5b7ba5e944c2..4ea7e9d3deec 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -48,7 +48,6 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SecondaryPartitionType; import org.apache.druid.indexer.report.TaskReport; -import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -62,7 +61,6 @@ import org.apache.druid.indexing.common.task.batch.partition.PartitionAnalysis; import org.apache.druid.indexing.input.DruidInputSource; import org.apache.druid.indexing.input.TaskInputSource; -import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.indexing.overlord.sampler.InputSourceSampler; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index fa7d037c92cf..7ca7fd74c2db 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -719,7 +719,8 @@ ListenableFuture publishInBackground( e -> (e != null && e.getMessage() != null && e.getMessage().contains("Failed to update the metadata Store." + " The new start metadata is ahead of last commited end state.")), - RetryUtils.DEFAULT_MAX_TRIES + // Do not retry here since the TransactionalSegmentPublisher itself performs required retries + 1 ); } catch (Exception e) { diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java index 085958c5f9fb..a071ff7af7d5 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -29,9 +29,11 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence; import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; +import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -41,11 +43,13 @@ import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -199,7 +203,19 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, schema) -> SegmentPublishResult.ok(ImmutableSet.of()); + return new TransactionalSegmentPublisher() + { + @Override + public SegmentPublishResult publishAnnotatedSegments( + @Nullable Set segmentsToBeOverwritten, + Set segmentsToPublish, + @Nullable Object commitMetadata, + @Nullable SegmentSchemaMapping segmentSchemaMapping + ) + { + return SegmentPublishResult.ok(Set.of()); + } + }; } static class TestSegmentAllocator implements SegmentAllocator diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 27aadaee574f..4bf3a8dc22b0 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.handoff.SegmentHandoffNotifier; import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; import org.apache.druid.segment.loading.DataSegmentKiller; @@ -54,9 +55,9 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -68,6 +69,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; public class StreamAppenderatorDriverTest extends EasyMockSupport { @@ -411,13 +413,14 @@ private Set asIdentifiers(Iterable segments static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> - SegmentPublishResult.ok(Collections.emptySet()); + return makePublisher( + (segmentsToPublish) -> SegmentPublishResult.ok(Set.of()) + ); } private TransactionalSegmentPublisher makeUpgradingPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> { + return makePublisher((segmentsToPublish) -> { Set allSegments = new HashSet<>(segmentsToPublish); int id = 0; for (DataSegment segment : segmentsToPublish) { @@ -435,17 +438,36 @@ private TransactionalSegmentPublisher makeUpgradingPublisher() allSegments.add(upgradedSegment); } return SegmentPublishResult.ok(allSegments); - }; + }); } static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> { + return makePublisher((segmentsToPublish) -> { final RuntimeException exception = new RuntimeException("test"); if (failWithException) { throw exception; } return SegmentPublishResult.fail(exception.getMessage()); + }); + } + + private static TransactionalSegmentPublisher makePublisher( + Function, SegmentPublishResult> publishFunction + ) + { + return new TransactionalSegmentPublisher() + { + @Override + public SegmentPublishResult publishAnnotatedSegments( + @Nullable Set segmentsToBeOverwritten, + Set segmentsToPublish, + @Nullable Object commitMetadata, + @Nullable SegmentSchemaMapping segmentSchemaMapping + ) + { + return publishFunction.apply(segmentsToPublish); + } }; } From eedd423b4bdd26caf28e774aea7741ab52624173 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 22 Mar 2025 12:50:13 +0530 Subject: [PATCH 5/8] Revert extra changes --- .../common/actions/LocalTaskActionClient.java | 2 - .../SegmentTransactionalAppendAction.java | 2 - .../SegmentTransactionalInsertAction.java | 5 ++- .../SegmentTransactionalReplaceAction.java | 8 ++-- .../common/actions/TaskActionHolder.java | 5 +-- .../overlord/http/OverlordResource.java | 43 +++++++++++-------- .../apache/druid/common/config/Configs.java | 21 --------- 7 files changed, 35 insertions(+), 51 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java index af1f362a669b..1d0059335ed1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.common.actions; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.java.util.common.jackson.JacksonUtils; @@ -70,7 +69,6 @@ private R performAction(TaskAction taskAction) return result; } catch (Throwable t) { - Throwables.throwIfUnchecked(t); throw new RuntimeException(t); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index 63feb0e4c9c4..ea3a4fd36d2a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.base.Throwables; import org.apache.druid.error.DruidException; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLock; @@ -200,7 +199,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) ); } catch (Exception e) { - Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index e06cb9b37849..fb988e78e5d4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; @@ -173,6 +172,9 @@ public TypeReference getReturnTypeReference() return new TypeReference<>() {}; } + /** + * Performs some sanity checks and publishes the given segments. + */ @Override public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) { @@ -232,7 +234,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) ); } catch (Exception e) { - Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index a44a55d13b9a..6546ed80d9d9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Optional; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; @@ -109,6 +108,9 @@ public TypeReference getReturnTypeReference() return new TypeReference<>() {}; } + /** + * Performs some sanity checks and publishes the given segments. + */ @Override public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) { @@ -138,10 +140,11 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) ); } catch (Exception e) { - Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } + IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox); + // Upgrade any overlapping pending segments // Do not perform upgrade in the same transaction as replace commit so that // failure to upgrade pending segments does not affect success of the commit @@ -154,7 +157,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) } } - IndexTaskUtils.emitSegmentPublishMetrics(publishResult, task, toolbox); return publishResult; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionHolder.java index 8738a9044736..107a4bc7a404 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskActionHolder.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.druid.common.config.Configs; import org.apache.druid.indexing.common.task.Task; /** @@ -37,8 +36,8 @@ public TaskActionHolder( @JsonProperty("action") TaskAction action ) { - this.task = Configs.ensureNotNull(task, "'task' must not be null"); - this.action = Configs.ensureNotNull(action, "'action' must not be null"); + this.task = task; + this.action = action; } @JsonProperty diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 7cf52322bca9..74656dfdb5f1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -41,6 +41,7 @@ import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionHolder; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.DruidOverlord; @@ -96,6 +97,7 @@ import javax.ws.rs.core.Response.Status; import java.io.InputStream; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -507,25 +509,30 @@ public Response getWorkerConfigHistory( @ResourceFilters(StateResourceFilter.class) public Response doAction(final TaskActionHolder holder) { - final Task task = holder.getTask(); return asLeaderWith( - taskMaster.getTaskActionClient(task), - taskActionClient -> { - try { - final Object result = taskActionClient.submit(holder.getAction()); - return Response.ok().entity(Map.of("result", result)).build(); - } - catch (DruidException e) { - log.noStackTrace().warn( - e, - "Exception while performing action[%s] for task[%s]", - holder.getAction(), task.getId() - ); - return ServletResourceUtils.buildErrorResponseFrom(e); - } - catch (Throwable e) { - log.warn(e, "Failed to perform action[%s] for task[%s]", holder.getAction(), task.getId()); - return Response.serverError().entity(ImmutableMap.of("error", e.getMessage())).build(); + taskMaster.getTaskActionClient(holder.getTask()), + new Function<>() + { + @Override + public Response apply(TaskActionClient taskActionClient) + { + final Map retMap; + + // It would be great to verify that this worker is actually supposed to be running the task before + // actually doing the action. Some ideas for how that could be done would be using some sort of attempt_id + // or token that gets passed around. + + try { + final Object ret = taskActionClient.submit(holder.getAction()); + retMap = new HashMap<>(); + retMap.put("result", ret); + } + catch (Exception e) { + log.warn(e, "Failed to perform task action"); + return Response.serverError().entity(ImmutableMap.of("error", e.getMessage())).build(); + } + + return Response.ok().entity(retMap).build(); } } ); diff --git a/processing/src/main/java/org/apache/druid/common/config/Configs.java b/processing/src/main/java/org/apache/druid/common/config/Configs.java index 7fb900601d18..433a2548a4bf 100644 --- a/processing/src/main/java/org/apache/druid/common/config/Configs.java +++ b/processing/src/main/java/org/apache/druid/common/config/Configs.java @@ -19,32 +19,11 @@ package org.apache.druid.common.config; -import org.apache.druid.error.InvalidInput; - /** * Utility class for common config operations. */ public class Configs { - /** - * Ensures that the given value is not null, otherwise throws an exception. - * - * @param value Value to check for null - * @param message Message to use for the exception, if thrown - * @param args Arguments used to interpolate the exception message, if thrown - * @return The given value unchanged if it is not null - * @throws org.apache.druid.error.DruidException of type {@link InvalidInput} - * if the input {@code value} is null. - */ - public static T ensureNotNull(T value, String message, Object... args) - { - if (value == null) { - throw InvalidInput.exception(message, args); - } else { - return value; - } - } - /** * Returns the given {@code value} if it is not null, otherwise returns the * {@code defaultValue}. From 212e06d8bea531c379419133f87479899f34f3e8 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 22 Mar 2025 12:54:44 +0530 Subject: [PATCH 6/8] Fix compilation --- .../druid/common/config/ConfigsTest.java | 24 ------------------- .../overlord/SegmentPublishResult.java | 2 +- 2 files changed, 1 insertion(+), 25 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java b/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java index edbc15a2947a..edfc9a25164b 100644 --- a/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java +++ b/processing/src/test/java/org/apache/druid/common/config/ConfigsTest.java @@ -19,9 +19,6 @@ package org.apache.druid.common.config; -import org.apache.druid.error.DruidException; -import org.apache.druid.error.DruidExceptionMatcher; -import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; @@ -43,25 +40,4 @@ public void testValueOrDefault() Assert.assertEquals("def", Configs.valueOrDefault(null, "def")); } - @Test - public void testEnsureNotNull() - { - Assert.assertEquals( - "abc", - Configs.ensureNotNull("abc", "Config should not be null") - ); - } - - @Test - public void testEnsureNotNull_throwsException_ifValueIsNull() - { - MatcherAssert.assertThat( - Assert.assertThrows( - DruidException.class, - () -> Configs.ensureNotNull(null, "Config[%s] should not be null", "abc") - ), - DruidExceptionMatcher.invalidInput().expectMessageIs("Config[abc] should not be null") - ); - } - } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index 03b4f4bbb7ef..04b745c812e5 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -68,7 +68,7 @@ public static SegmentPublishResult retryableFailure(String errorMsg, Object... a private SegmentPublishResult( @JsonProperty("segments") Set segments, @JsonProperty("success") boolean success, - @JsonProperty("canRetry") boolean retryable, + @JsonProperty("retryable") boolean retryable, @JsonProperty("errorMsg") @Nullable String errorMsg ) { From 93b330ea5c6b513331e464a00da96ca12c3e354c Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 22 Mar 2025 13:59:00 +0530 Subject: [PATCH 7/8] Add test for TransactionalSegmentPublisher --- .../TransactionalSegmentPublisherTest.java | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java new file mode 100644 index 000000000000..cafc19695be8 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.segment.SegmentSchemaMapping; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +public class TransactionalSegmentPublisherTest +{ + @Test(timeout = 60_000L) + public void testPublishSegments_retriesUpto5Times_ifFailureIsRetryable() throws IOException + { + final AtomicInteger attemptCount = new AtomicInteger(0); + final TransactionalSegmentPublisher publisher = createPublisher( + SegmentPublishResult.retryableFailure("this error is retryable"), + attemptCount + ); + + Assert.assertEquals( + SegmentPublishResult.retryableFailure("this error is retryable"), + publisher.publishSegments(null, Set.of(), Function.identity(), null, null) + ); + Assert.assertEquals(6, attemptCount.get()); + } + + @Test + public void testPublishSegments_doesNotRetry_ifFailureIsNotRetryable() throws IOException + { + final AtomicInteger attemptCount = new AtomicInteger(0); + final TransactionalSegmentPublisher publisher = createPublisher( + SegmentPublishResult.fail("this error is not retryable"), + attemptCount + ); + + Assert.assertEquals( + SegmentPublishResult.fail("this error is not retryable"), + publisher.publishSegments(null, Set.of(), Function.identity(), null, null) + ); + Assert.assertEquals(1, attemptCount.get()); + } + + @Test + public void testPublishAnnotatedSegments_doesNotRetry() throws Exception + { + final AtomicInteger attemptCount = new AtomicInteger(0); + final TransactionalSegmentPublisher publisher = createPublisher( + SegmentPublishResult.retryableFailure("this error is retryable"), + attemptCount + ); + + Assert.assertEquals( + SegmentPublishResult.retryableFailure("this error is retryable"), + publisher.publishAnnotatedSegments(null, Set.of(), null, null) + ); + Assert.assertEquals(1, attemptCount.get()); + } + + private TransactionalSegmentPublisher createPublisher( + SegmentPublishResult publishResult, + AtomicInteger attemptCount + ) + { + return new TransactionalSegmentPublisher() + { + @Override + public SegmentPublishResult publishAnnotatedSegments( + @Nullable Set segmentsToBeOverwritten, + Set segmentsToPublish, + @Nullable Object commitMetadata, + @Nullable SegmentSchemaMapping segmentSchemaMapping + ) + { + attemptCount.incrementAndGet(); + return publishResult; + } + }; + } +} \ No newline at end of file From a9e793638c97ef29fff486ce4c9b14eccd1e28d4 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Sat, 22 Mar 2025 15:06:48 +0530 Subject: [PATCH 8/8] Add newline --- .../appenderator/TransactionalSegmentPublisherTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java index cafc19695be8..884b475893d1 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java @@ -101,4 +101,4 @@ public SegmentPublishResult publishAnnotatedSegments( } }; } -} \ No newline at end of file +}