diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index 38bc961dc47a..ec9c12a14fe3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -76,6 +76,7 @@ import org.apache.druid.segment.indexing.IngestionSpec; import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import org.apache.druid.segment.transform.CompactionTransformSpec; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; @@ -456,6 +457,30 @@ protected TaskAction buildPublishAction( } } + protected TransactionalSegmentPublisher buildSegmentPublisher(TaskToolbox toolbox) + { + return new TransactionalSegmentPublisher() + { + @Override + public SegmentPublishResult publishAnnotatedSegments( + @Nullable Set segmentsToBeOverwritten, + Set segmentsToPublish, + @Nullable Object commitMetadata, + @Nullable SegmentSchemaMapping schemaMapping + ) throws IOException + { + return toolbox.getTaskActionClient().submit( + buildPublishAction( + segmentsToBeOverwritten, + segmentsToPublish, + schemaMapping, + getTaskLockHelper().getLockTypeToUse() + ) + ); + } + }; + } + protected boolean tryTimeChunkLock(TaskActionClient client, List intervals) throws IOException { // The given intervals are first converted to align with segment granularity. This is because, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index eae1f7caf1e6..4ea7e9d3deec 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -48,7 +48,6 @@ import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SecondaryPartitionType; import org.apache.druid.indexer.report.TaskReport; -import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -866,11 +865,7 @@ private TaskStatus generateAndPublishSegments( throw new UOE("[%s] secondary partition type is not supported", partitionsSpec.getType()); } - final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse(); - final TransactionalSegmentPublisher publisher = - (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> toolbox.getTaskActionClient().submit( - buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, map, taskLockType) - ); + final TransactionalSegmentPublisher publisher = buildSegmentPublisher(toolbox); String effectiveId = getContextValue(CompactionTask.CTX_KEY_APPENDERATOR_TRACKING_TASK_ID, null); if (effectiveId == null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 86e31c74a72f..34cc71a3a127 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -45,7 +45,6 @@ import org.apache.druid.indexer.report.IngestionStatsAndErrors; import org.apache.druid.indexer.report.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexer.report.TaskReport; -import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.task.AbstractBatchIndexTask; @@ -1191,12 +1190,7 @@ private void publishSegments( } } - final TaskLockType taskLockType = getTaskLockHelper().getLockTypeToUse(); - final TransactionalSegmentPublisher publisher = - (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, map) -> toolbox.getTaskActionClient().submit( - buildPublishAction(segmentsToBeOverwritten, segmentsToPublish, map, taskLockType) - ); - + final TransactionalSegmentPublisher publisher = buildSegmentPublisher(toolbox); final boolean published = newSegments.isEmpty() || publisher.publishSegments(oldSegments, newSegments, annotateFunction, null, segmentSchemaMapping).isSuccess(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java index 2da858f80ccd..f974a1c6c932 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SequenceMetadata.java @@ -331,7 +331,7 @@ TransactionalSegmentPublisher createPublisher( } private class SequenceMetadataTransactionalSegmentPublisher - implements TransactionalSegmentPublisher + extends TransactionalSegmentPublisher { private final SeekableStreamIndexTaskRunner runner; private final TaskToolbox toolbox; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index 44ce60b5ceb2..095e9c3b57d2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; @@ -150,11 +149,9 @@ public void testFailTransactionalUpdateDataSourceMetadata() throws Exception ); Assert.assertEquals( - SegmentPublishResult.fail( - InvalidInput.exception( - "The new start metadata state[ObjectMetadata{theObject=[1]}] is" - + " ahead of the last committed end state[null]. Try resetting the supervisor." - ).toString() + SegmentPublishResult.retryableFailure( + "The new start metadata state[ObjectMetadata{theObject=[1]}] is" + + " ahead of the last committed end state[null]. Try resetting the supervisor." ), result ); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index e4bc1645f710..04b745c812e5 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.SegmentUtils; import org.apache.druid.timeline.DataSegment; @@ -34,59 +34,59 @@ import java.util.Set; /** - * Result of an operation that attempts to publish segments. Indicates the set of segments actually published - * and whether or not the transaction was a success. - * - * If "success" is false then the segments set will be empty. - * - * It's possible for the segments set to be empty even if "success" is true, since the segments set only - * includes segments actually published as part of the transaction. The requested segments could have been - * published by a different transaction (e.g. in the case of replica sets) and this one would still succeed. + * Result of a segment publish operation. */ public class SegmentPublishResult { private final Set segments; private final boolean success; - @Nullable + private final boolean retryable; private final String errorMsg; - @Nullable private final List upgradedPendingSegments; public static SegmentPublishResult ok(Set segments) { - return new SegmentPublishResult(segments, true, null); + return new SegmentPublishResult(segments, true, false, null); } public static SegmentPublishResult ok(Set segments, List upgradedPendingSegments) { - return new SegmentPublishResult(segments, true, null, upgradedPendingSegments); + return new SegmentPublishResult(segments, true, false, null, upgradedPendingSegments); } - public static SegmentPublishResult fail(String errorMsg) + public static SegmentPublishResult fail(String errorMsg, Object... args) { - return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg); + return new SegmentPublishResult(Set.of(), false, false, StringUtils.format(errorMsg, args), null); + } + + public static SegmentPublishResult retryableFailure(String errorMsg, Object... args) + { + return new SegmentPublishResult(Set.of(), false, true, StringUtils.format(errorMsg, args), null); } @JsonCreator private SegmentPublishResult( @JsonProperty("segments") Set segments, @JsonProperty("success") boolean success, + @JsonProperty("retryable") boolean retryable, @JsonProperty("errorMsg") @Nullable String errorMsg ) { - this(segments, success, errorMsg, null); + this(segments, success, retryable, errorMsg, null); } private SegmentPublishResult( Set segments, boolean success, - @Nullable String errorMsg, + boolean retryable, + @Nullable String errorMsg, List upgradedPendingSegments ) { this.segments = Preconditions.checkNotNull(segments, "segments"); this.success = success; this.errorMsg = errorMsg; + this.retryable = retryable; this.upgradedPendingSegments = upgradedPendingSegments; if (!success) { @@ -98,6 +98,12 @@ private SegmentPublishResult( } } + /** + * Set of segments published successfully. + * + * @return Empty set if the publish operation failed or if all the segments had + * already been published by a different transaction. + */ @JsonProperty public Set getSegments() { @@ -117,6 +123,12 @@ public String getErrorMsg() return errorMsg; } + @JsonProperty + public boolean isRetryable() + { + return retryable; + } + @Nullable public List getUpgradedPendingSegments() { @@ -134,6 +146,7 @@ public boolean equals(Object o) } SegmentPublishResult that = (SegmentPublishResult) o; return success == that.success && + retryable == that.retryable && Objects.equals(segments, that.segments) && Objects.equals(errorMsg, that.errorMsg); } @@ -141,7 +154,7 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(segments, success, errorMsg); + return Objects.hash(segments, success, errorMsg, retryable); } @Override @@ -150,6 +163,7 @@ public String toString() return "SegmentPublishResult{" + "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + ", success=" + success + + ", retryable=" + retryable + ", errorMsg='" + errorMsg + '\'' + '}'; } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index ce092f8c6967..9a0f91d6afd6 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -91,7 +91,6 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -306,33 +305,22 @@ public SegmentPublishResult commitSegmentsAndMetadata( final String dataSource = segments.iterator().next().getDataSource(); - final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false); - try { return inReadWriteDatasourceTransaction( dataSource, transaction -> { - // Set definitelyNotUpdated back to false upon retrying. - definitelyNotUpdated.set(false); - + // Try to update datasource metadata first if (startMetadata != null) { - final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle( + final SegmentPublishResult result = updateDataSourceMetadataWithHandle( transaction, dataSource, startMetadata, endMetadata ); - if (result.isFailed()) { - // Metadata was definitely not updated. - transaction.setRollbackOnly(); - definitelyNotUpdated.set(true); - - if (result.canRetry()) { - throw new RetryTransactionException(result.getErrorMsg()); - } else { - throw InvalidInput.exception(result.getErrorMsg()); - } + // Do not proceed if the datasource metadata update failed + if (!result.isSuccess()) { + return result; } } @@ -347,12 +335,7 @@ public SegmentPublishResult commitSegmentsAndMetadata( ); } catch (CallbackFailedException e) { - if (definitelyNotUpdated.get()) { - return SegmentPublishResult.fail(e.getMessage()); - } else { - // Must throw exception if we are not sure if we updated or not. - throw e; - } + throw e; } } @@ -468,45 +451,19 @@ public SegmentPublishResult commitMetadataOnly( throw new IllegalArgumentException("end metadata cannot be null"); } - final AtomicBoolean definitelyNotUpdated = new AtomicBoolean(false); - try { return inReadWriteDatasourceTransaction( dataSource, - transaction -> { - // Set definitelyNotUpdated back to false upon retrying. - definitelyNotUpdated.set(false); - - final DataStoreMetadataUpdateResult result = updateDataSourceMetadataWithHandle( - transaction, - dataSource, - startMetadata, - endMetadata - ); - - if (result.isFailed()) { - // Metadata was definitely not updated. - transaction.setRollbackOnly(); - definitelyNotUpdated.set(true); - - if (result.canRetry()) { - throw new RetryTransactionException(result.getErrorMsg()); - } else { - throw new RuntimeException(result.getErrorMsg()); - } - } - - return SegmentPublishResult.ok(ImmutableSet.of()); - } + transaction -> updateDataSourceMetadataWithHandle( + transaction, + dataSource, + startMetadata, + endMetadata + ) ); } catch (CallbackFailedException e) { - if (definitelyNotUpdated.get()) { - return SegmentPublishResult.fail(e.getMessage()); - } else { - // Must throw exception if we are not sure if we updated or not. - throw e; - } + throw e; } } @@ -1126,25 +1083,18 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } ); - final AtomicBoolean metadataNotUpdated = new AtomicBoolean(false); try { return inReadWriteDatasourceTransaction( dataSource, transaction -> { - metadataNotUpdated.set(false); - + // Try to update datasource metadata first if (startMetadata != null) { - final DataStoreMetadataUpdateResult metadataUpdateResult + final SegmentPublishResult metadataUpdateResult = updateDataSourceMetadataWithHandle(transaction, dataSource, startMetadata, endMetadata); - if (metadataUpdateResult.isFailed()) { - transaction.setRollbackOnly(); - metadataNotUpdated.set(true); - if (metadataUpdateResult.canRetry()) { - throw new RetryTransactionException(metadataUpdateResult.getErrorMsg()); - } else { - throw new RuntimeException(metadataUpdateResult.getErrorMsg()); - } + // Abort the transaction if datasource metadata update has failed + if (!metadataUpdateResult.isSuccess()) { + return metadataUpdateResult; } } @@ -1172,12 +1122,7 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( ); } catch (CallbackFailedException e) { - if (metadataNotUpdated.get()) { - // Return failed result if metadata was definitely not updated - return SegmentPublishResult.fail(e.getMessage()); - } else { - throw e; - } + throw e; } } @@ -2052,7 +1997,7 @@ private Map getAppendSegmentsCommittedDuringTask( * * @throws RuntimeException if state is unknown after this call */ - protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected SegmentPublishResult updateDataSourceMetadataWithHandle( final SegmentMetadataTransaction transaction, final String dataSource, final DataSourceMetadata startMetadata, @@ -2102,7 +2047,9 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) { // Offsets stored in startMetadata is greater than the last commited metadata. - return DataStoreMetadataUpdateResult.failure( + // This can happen because the previous task is still publishing its segments and can resolve once + // the previous task finishes publishing. + return SegmentPublishResult.retryableFailure( "The new start metadata state[%s] is ahead of the last committed" + " end state[%s]. Try resetting the supervisor.", startMetadata, oldCommitMetadataFromDb @@ -2111,7 +2058,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( if (!startMetadataMatchesExisting) { // Not in the desired start state. - return DataStoreMetadataUpdateResult.failure( + return SegmentPublishResult.fail( "Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.", oldCommitMetadataFromDb, startMetadata ); @@ -2126,7 +2073,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() ); - final DataStoreMetadataUpdateResult retVal; + final SegmentPublishResult retVal; if (oldCommitMetadataBytesFromDb == null) { // SELECT -> INSERT can fail due to races; callers must be prepared to retry. final int numRows = transaction.getHandle().createStatement( @@ -2143,8 +2090,8 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( .execute(); retVal = numRows == 1 - ? DataStoreMetadataUpdateResult.SUCCESS - : DataStoreMetadataUpdateResult.retryableFailure("Failed to insert metadata for datasource[%s]", dataSource); + ? SegmentPublishResult.ok(Set.of()) + : SegmentPublishResult.retryableFailure("Failed to insert metadata for datasource[%s]", dataSource); } else { // Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE final int numRows = transaction.getHandle().createStatement( @@ -2163,8 +2110,8 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( .execute(); retVal = numRows == 1 - ? DataStoreMetadataUpdateResult.SUCCESS - : DataStoreMetadataUpdateResult.retryableFailure("Failed to update metadata for datasource[%s]", dataSource); + ? SegmentPublishResult.ok(Set.of()) + : SegmentPublishResult.retryableFailure("Failed to update metadata for datasource[%s]", dataSource); } if (retVal.isSuccess()) { @@ -2521,51 +2468,4 @@ private T inReadOnlyDatasourceTransaction( { return transactionFactory.inReadOnlyDatasourceTransaction(dataSource, callback); } - - public static class DataStoreMetadataUpdateResult - { - private final boolean failed; - private final boolean canRetry; - private final String errorMsg; - - public static final DataStoreMetadataUpdateResult SUCCESS = new DataStoreMetadataUpdateResult(false, false, null); - - public static DataStoreMetadataUpdateResult failure(String errorMsgFormat, Object... messageArgs) - { - return new DataStoreMetadataUpdateResult(true, false, errorMsgFormat, messageArgs); - } - - public static DataStoreMetadataUpdateResult retryableFailure(String errorMsgFormat, Object... messageArgs) - { - return new DataStoreMetadataUpdateResult(true, true, errorMsgFormat, messageArgs); - } - - DataStoreMetadataUpdateResult(boolean failed, boolean canRetry, @Nullable String errorMsg, Object... errorFormatArgs) - { - this.failed = failed; - this.canRetry = canRetry; - this.errorMsg = null == errorMsg ? null : StringUtils.format(errorMsg, errorFormatArgs); - } - - public boolean isFailed() - { - return failed; - } - - public boolean isSuccess() - { - return !failed; - } - - public boolean canRetry() - { - return canRetry; - } - - @Nullable - public String getErrorMsg() - { - return errorMsg; - } - } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index fa7d037c92cf..7ca7fd74c2db 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -719,7 +719,8 @@ ListenableFuture publishInBackground( e -> (e != null && e.getMessage() != null && e.getMessage().contains("Failed to update the metadata Store." + " The new start metadata is ahead of last commited end state.")), - RetryUtils.DEFAULT_MAX_TRIES + // Do not retry here since the TransactionalSegmentPublisher itself performs required retries + 1 ); } catch (Exception e) { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java index 390f423fdb5f..eb84b50399ad 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisher.java @@ -20,6 +20,9 @@ package org.apache.druid.segment.realtime.appenderator; import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.timeline.DataSegment; @@ -28,8 +31,11 @@ import java.util.Set; import java.util.function.Function; -public interface TransactionalSegmentPublisher +public abstract class TransactionalSegmentPublisher { + private static final int QUIET_RETRIES = 3; + private static final int MAX_RETRIES = 5; + /** * Publish segments, along with some commit metadata, in a single transaction. * @@ -40,14 +46,19 @@ public interface TransactionalSegmentPublisher * @throws IOException if there was an I/O error when publishing * @throws RuntimeException if we cannot tell if the segments were published or not, for some other reason */ - SegmentPublishResult publishAnnotatedSegments( + public abstract SegmentPublishResult publishAnnotatedSegments( @Nullable Set segmentsToBeOverwritten, Set segmentsToPublish, @Nullable Object commitMetadata, @Nullable SegmentSchemaMapping segmentSchemaMapping ) throws IOException; - default SegmentPublishResult publishSegments( + /** + * Applies the given annotate function on the segments and tries to publish + * them. If the action fails with a retryable failure, it can be retried upto + * {@link #MAX_RETRIES} times. + */ + public final SegmentPublishResult publishSegments( @Nullable Set segmentsToBeOverwritten, Set segmentsToPublish, Function, Set> outputSegmentsAnnotateFunction, @@ -57,20 +68,58 @@ default SegmentPublishResult publishSegments( { final Function, Set> annotateFunction = outputSegmentsAnnotateFunction .andThen(SegmentPublisherHelper::annotateShardSpec); - return publishAnnotatedSegments( + final Set annotatedSegmentsToPublish = annotateFunction.apply(segmentsToPublish); + + int attemptCount = 0; + + // Retry until success or until max retries are exhausted + SegmentPublishResult result = publishAnnotatedSegments( segmentsToBeOverwritten, - annotateFunction.apply(segmentsToPublish), + annotatedSegmentsToPublish, commitMetadata, segmentSchemaMapping ); + while (!result.isSuccess() && result.isRetryable() && attemptCount++ < MAX_RETRIES) { + awaitNextRetry(result, attemptCount); + result = publishAnnotatedSegments( + segmentsToBeOverwritten, + annotatedSegmentsToPublish, + commitMetadata, + segmentSchemaMapping + ); + } + + return result; } /** * @return true if this publisher has action to take when publishing with an empty segment set. * The publisher used by the seekable stream tasks is an example where this is true. */ - default boolean supportsEmptyPublish() + public boolean supportsEmptyPublish() { return false; } + + /** + * Sleeps until the next attempt. + */ + private static void awaitNextRetry(SegmentPublishResult lastResult, int attemptCount) + { + try { + RetryUtils.awaitNextRetry( + new ISE(lastResult.getErrorMsg()), + StringUtils.format( + "Segment publish failed due to error[%s]", + lastResult.getErrorMsg() + ), + attemptCount, + MAX_RETRIES, + attemptCount <= QUIET_RETRIES + ); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index b6578fa9a721..0212c8f2d998 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import org.apache.druid.data.input.StringTuple; -import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.overlord.SegmentCreateRequest; @@ -186,7 +185,7 @@ public int getMaxRetries() ) { @Override - protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected SegmentPublishResult updateDataSourceMetadataWithHandle( SegmentMetadataTransaction transaction, String dataSource, DataSourceMetadata startMetadata, @@ -780,7 +779,7 @@ public void testTransactionalAnnounceRetryAndSuccess() throws IOException ) { @Override - protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected SegmentPublishResult updateDataSourceMetadataWithHandle( SegmentMetadataTransaction transaction, String dataSource, DataSourceMetadata startMetadata, @@ -789,7 +788,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( { metadataUpdateCounter.getAndIncrement(); if (attemptCounter.getAndIncrement() == 0) { - return DataStoreMetadataUpdateResult.retryableFailure(null); + return SegmentPublishResult.retryableFailure("this failure can be retried"); } else { return super.updateDataSourceMetadataWithHandle(transaction, dataSource, startMetadata, endMetadata); } @@ -803,7 +802,15 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( new ObjectMetadata(ImmutableMap.of("foo", "bar")), new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) ); - Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1); + Assert.assertEquals(SegmentPublishResult.retryableFailure("this failure can be retried"), result1); + + final SegmentPublishResult resultOnRetry = failOnceCoordinator.commitSegmentsAndMetadata( + ImmutableSet.of(defaultSegment), + new ObjectMetadata(null), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) + ); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), resultOnRetry); Assert.assertArrayEquals( mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8), @@ -825,7 +832,15 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( new ObjectMetadata(ImmutableMap.of("foo", "baz")), new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) ); - Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), result2); + Assert.assertEquals(SegmentPublishResult.retryableFailure("this failure can be retried"), result2); + + final SegmentPublishResult resultOnRetry2 = failOnceCoordinator.commitSegmentsAndMetadata( + ImmutableSet.of(defaultSegment2), + new ObjectMetadata(ImmutableMap.of("foo", "bar")), + new ObjectMetadata(ImmutableMap.of("foo", "baz")), + new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) + ); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), resultOnRetry2); Assert.assertArrayEquals( mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8), @@ -857,11 +872,10 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION) ); Assert.assertEquals( - SegmentPublishResult.fail( - InvalidInput.exception( - "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed" - + " end state[null]. Try resetting the supervisor." - ).toString()), + SegmentPublishResult.retryableFailure( + "The new start metadata state[ObjectMetadata{theObject={foo=bar}}] is ahead of the last committed" + + " end state[null]. Try resetting the supervisor." + ), result1 ); @@ -888,10 +902,8 @@ public void testTransactionalAnnounceFailDbNotNullWantNull() ); Assert.assertEquals( SegmentPublishResult.fail( - InvalidInput.exception( - "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]" - + " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor." - ).toString() + "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]" + + " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor." ), result2 ); @@ -972,10 +984,9 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() ); Assert.assertEquals( SegmentPublishResult.fail( - InvalidInput.exception( - "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and " - + "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor." - ).toString()), + "Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and " + + "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor." + ), result2 ); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java index 9bed46b2c3a9..c72e1a75afee 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java @@ -108,7 +108,7 @@ public void setUp() ) { @Override - protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( + protected SegmentPublishResult updateDataSourceMetadataWithHandle( SegmentMetadataTransaction transaction, String dataSource, DataSourceMetadata startMetadata, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java index 085958c5f9fb..a071ff7af7d5 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -29,9 +29,11 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; +import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.loading.DataSegmentKiller; import org.apache.druid.segment.realtime.appenderator.BaseAppenderatorDriver.SegmentsForSequence; import org.apache.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; +import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; @@ -41,11 +43,13 @@ import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -199,7 +203,19 @@ private void checkSegmentStates(int expectedNumSegmentsInState, SegmentState exp static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, schema) -> SegmentPublishResult.ok(ImmutableSet.of()); + return new TransactionalSegmentPublisher() + { + @Override + public SegmentPublishResult publishAnnotatedSegments( + @Nullable Set segmentsToBeOverwritten, + Set segmentsToPublish, + @Nullable Object commitMetadata, + @Nullable SegmentSchemaMapping segmentSchemaMapping + ) + { + return SegmentPublishResult.ok(Set.of()); + } + }; } static class TestSegmentAllocator implements SegmentAllocator diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index 27aadaee574f..4bf3a8dc22b0 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -37,6 +37,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.segment.SegmentSchemaMapping; import org.apache.druid.segment.handoff.SegmentHandoffNotifier; import org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory; import org.apache.druid.segment.loading.DataSegmentKiller; @@ -54,9 +55,9 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nullable; import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -68,6 +69,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; public class StreamAppenderatorDriverTest extends EasyMockSupport { @@ -411,13 +413,14 @@ private Set asIdentifiers(Iterable segments static TransactionalSegmentPublisher makeOkPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> - SegmentPublishResult.ok(Collections.emptySet()); + return makePublisher( + (segmentsToPublish) -> SegmentPublishResult.ok(Set.of()) + ); } private TransactionalSegmentPublisher makeUpgradingPublisher() { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> { + return makePublisher((segmentsToPublish) -> { Set allSegments = new HashSet<>(segmentsToPublish); int id = 0; for (DataSegment segment : segmentsToPublish) { @@ -435,17 +438,36 @@ private TransactionalSegmentPublisher makeUpgradingPublisher() allSegments.add(upgradedSegment); } return SegmentPublishResult.ok(allSegments); - }; + }); } static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) { - return (segmentsToBeOverwritten, segmentsToPublish, commitMetadata, segmentSchemaMapping) -> { + return makePublisher((segmentsToPublish) -> { final RuntimeException exception = new RuntimeException("test"); if (failWithException) { throw exception; } return SegmentPublishResult.fail(exception.getMessage()); + }); + } + + private static TransactionalSegmentPublisher makePublisher( + Function, SegmentPublishResult> publishFunction + ) + { + return new TransactionalSegmentPublisher() + { + @Override + public SegmentPublishResult publishAnnotatedSegments( + @Nullable Set segmentsToBeOverwritten, + Set segmentsToPublish, + @Nullable Object commitMetadata, + @Nullable SegmentSchemaMapping segmentSchemaMapping + ) + { + return publishFunction.apply(segmentsToPublish); + } }; } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java new file mode 100644 index 000000000000..884b475893d1 --- /dev/null +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/TransactionalSegmentPublisherTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.realtime.appenderator; + +import org.apache.druid.indexing.overlord.SegmentPublishResult; +import org.apache.druid.segment.SegmentSchemaMapping; +import org.apache.druid.timeline.DataSegment; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + +public class TransactionalSegmentPublisherTest +{ + @Test(timeout = 60_000L) + public void testPublishSegments_retriesUpto5Times_ifFailureIsRetryable() throws IOException + { + final AtomicInteger attemptCount = new AtomicInteger(0); + final TransactionalSegmentPublisher publisher = createPublisher( + SegmentPublishResult.retryableFailure("this error is retryable"), + attemptCount + ); + + Assert.assertEquals( + SegmentPublishResult.retryableFailure("this error is retryable"), + publisher.publishSegments(null, Set.of(), Function.identity(), null, null) + ); + Assert.assertEquals(6, attemptCount.get()); + } + + @Test + public void testPublishSegments_doesNotRetry_ifFailureIsNotRetryable() throws IOException + { + final AtomicInteger attemptCount = new AtomicInteger(0); + final TransactionalSegmentPublisher publisher = createPublisher( + SegmentPublishResult.fail("this error is not retryable"), + attemptCount + ); + + Assert.assertEquals( + SegmentPublishResult.fail("this error is not retryable"), + publisher.publishSegments(null, Set.of(), Function.identity(), null, null) + ); + Assert.assertEquals(1, attemptCount.get()); + } + + @Test + public void testPublishAnnotatedSegments_doesNotRetry() throws Exception + { + final AtomicInteger attemptCount = new AtomicInteger(0); + final TransactionalSegmentPublisher publisher = createPublisher( + SegmentPublishResult.retryableFailure("this error is retryable"), + attemptCount + ); + + Assert.assertEquals( + SegmentPublishResult.retryableFailure("this error is retryable"), + publisher.publishAnnotatedSegments(null, Set.of(), null, null) + ); + Assert.assertEquals(1, attemptCount.get()); + } + + private TransactionalSegmentPublisher createPublisher( + SegmentPublishResult publishResult, + AtomicInteger attemptCount + ) + { + return new TransactionalSegmentPublisher() + { + @Override + public SegmentPublishResult publishAnnotatedSegments( + @Nullable Set segmentsToBeOverwritten, + Set segmentsToPublish, + @Nullable Object commitMetadata, + @Nullable SegmentSchemaMapping segmentSchemaMapping + ) + { + attemptCount.incrementAndGet(); + return publishResult; + } + }; + } +}