From 42698a5e75caa2d4e5e1ef2a2bc7a1c1d15ca7de Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 10 Jul 2025 20:45:11 +0530 Subject: [PATCH 1/3] Fix concurrent append to interval with unused segments --- .../ConcurrentReplaceAndAppendTest.java | 6 +- .../IndexerSQLMetadataStorageCoordinator.java | 89 ++++++++++++++++--- .../druid/metadata/PendingSegmentRecord.java | 13 +++ .../metadata/SqlSegmentsMetadataQuery.java | 23 +++++ 4 files changed, 119 insertions(+), 12 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index d052fd654898..0300a397341d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -1141,8 +1141,10 @@ public void test_concurrentAppend_toIntervalWithUnusedSegments() // Allocate and commit another APPEND segment final SegmentIdWithShardSpec pendingSegment2 = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); - Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion()); - Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum()); + + // Verify that the new segment gets a different version + Assert.assertEquals(SEGMENT_V0 + "S", pendingSegment2.getVersion()); + Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum()); final DataSegment segmentV02 = asSegment(pendingSegment2); appendTask.commitAppendSegments(segmentV02); diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index dade16686490..b061a12d79d6 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -33,6 +33,7 @@ import com.google.inject.Inject; import org.apache.druid.common.utils.IdUtils; import org.apache.druid.error.DruidException; +import org.apache.druid.error.InternalServerError; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; @@ -1419,7 +1420,7 @@ private PendingSegmentRecord createNewPendingSegment( version, partialShardSpec.complete(jsonMapper, newPartitionId, 0) ); - pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId); + pendingSegmentId = getUniqueIdForPrimaryAllocation(transaction, pendingSegmentId); return PendingSegmentRecord.create( pendingSegmentId, request.getSequenceName(), @@ -1457,7 +1458,7 @@ private PendingSegmentRecord createNewPendingSegment( ) ); return PendingSegmentRecord.create( - getTrueAllocatedId(transaction, pendingSegmentId), + getUniqueIdForSecondaryAllocation(transaction, pendingSegmentId), request.getSequenceName(), request.getPreviousSegmentId(), null, @@ -1562,7 +1563,7 @@ private SegmentIdWithShardSpec createNewPendingSegment( version, partialShardSpec.complete(jsonMapper, newPartitionId, 0) ); - return getTrueAllocatedId(transaction, allocatedId); + return getUniqueIdForPrimaryAllocation(transaction, allocatedId); } else if (!overallMaxId.getInterval().equals(interval)) { log.warn( "Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].", @@ -1590,18 +1591,86 @@ private SegmentIdWithShardSpec createNewPendingSegment( committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions() ) ); - return getTrueAllocatedId(transaction, allocatedId); + return getUniqueIdForSecondaryAllocation(transaction, allocatedId); } } /** - * Verifies that the allocated id doesn't already exist in the druid_segments table. - * If yes, try to get the max unallocated id considering the unused segments for the datasource, version and interval - * Otherwise, use the same id. - * @param allocatedId The segment allcoted on the basis of used and pending segments - * @return a segment id that isn't already used by other unused segments + * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with + * any existing unused segment. If an unused segment already exists that matches + * the interval and version of the given {@code allocatedId}, a fresh version + * is created by suffixing one or more {@link PendingSegmentRecord#CONCURRENT_APPEND_VERSION_SUFFIX}. + * Such a conflict can happen only if all the segments in this interval created + * by a prior APPEND task were marked as unused. + *

+ * This method should be called only when allocating the first segment in an interval. + */ + private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation( + SegmentMetadataTransaction transaction, + SegmentIdWithShardSpec allocatedId + ) + { + // Get all the unused segment versions for this datasource and interval + final Set unusedSegmentVersions = transaction.noCacheSql().retrieveUnusedSegmentVersionsWithInterval( + allocatedId.getDataSource(), + allocatedId.getInterval() + ); + + final String allocatedVersion = allocatedId.getVersion(); + if (!unusedSegmentVersions.contains(allocatedVersion)) { + // Nothing to do, this version is new + return allocatedId; + } else if (!PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND.equals(allocatedVersion)) { + // Version clash should never happen for non-APPEND locks + throw DruidException.defensive( + "Cannot allocate segment[%s] as there are already some unused segments" + + " for version[%s] in this interval.", + allocatedId, allocatedVersion + ); + } + + // Iterate until a new non-clashing version is found + boolean foundFreshVersion = false; + StringBuilder candidateVersion = new StringBuilder( + allocatedId.getVersion() + PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX + ); + for (int i = 0; i < 10; ++i) { + if (unusedSegmentVersions.contains(candidateVersion.toString())) { + candidateVersion.append(PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX); + } else { + foundFreshVersion = true; + break; + } + } + + if (foundFreshVersion) { + return new SegmentIdWithShardSpec( + allocatedId.getDataSource(), + allocatedId.getInterval(), + candidateVersion.toString(), + allocatedId.getShardSpec() + ); + } else { + throw InternalServerError.exception( + "Could not allocate segment[%s] as there are too many unused" + + " versions(upto [%s]) in the interval. Kill the old unused versions to proceed.", + allocatedId, candidateVersion.toString() + ); + } + } + + /** + * Returns a unique {@link SegmentIdWithShardSpec} which does not clash with + * any existing unused segment. If an unused segment already exists that matches + * the interval, version and partition number of the given {@code allocatedId}, + * a higher partition number is used. Such a conflict can happen only if some + * segments of the underlying version have been marked as unused while others + * are still used. + *

+ * This method should not be called when allocating the first segment in an + * interval. */ - private SegmentIdWithShardSpec getTrueAllocatedId( + private SegmentIdWithShardSpec getUniqueIdForSecondaryAllocation( SegmentMetadataTransaction transaction, SegmentIdWithShardSpec allocatedId ) diff --git a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java index 47de746d917b..b76160ab9fb9 100644 --- a/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java +++ b/server/src/main/java/org/apache/druid/metadata/PendingSegmentRecord.java @@ -52,6 +52,19 @@ */ public class PendingSegmentRecord { + /** + * Default lock version used by concurrent APPEND tasks. + */ + public static final String DEFAULT_VERSION_FOR_CONCURRENT_APPEND = DateTimes.EPOCH.toString(); + + /** + * Suffix to use to construct fresh segment versions in the event of a clash. + * The chosen character {@code S} is just for visual ease so that two versions + * are not easily confused for each other. + * {@code 1970-01-01T00:00:00.000Z_1} vs {@code 1970-01-01T00:00:00.000ZS_1}. + */ + public static final String CONCURRENT_APPEND_VERSION_SUFFIX = "S"; + private final SegmentIdWithShardSpec id; private final String sequenceName; private final String sequencePrevId; diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 2779c16da175..0eba529c7b4d 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -1128,6 +1128,29 @@ public List retrieveUnusedSegmentsWithExactInterval( return segments.stream().filter(Objects::nonNull).collect(Collectors.toList()); } + /** + * Retrieves the versions of unused segments which are perfectly aligned with + * the given interval. + */ + public Set retrieveUnusedSegmentVersionsWithInterval(String dataSource, Interval interval) + { + final String sql = StringUtils.format( + "SELECT DISTINCT(version) FROM %1$s" + + " WHERE dataSource = :dataSource AND used = false" + + " AND %2$send%2$s = :end AND start = :start", + dbTables.getSegmentsTable(), + connector.getQuoteString() + ); + return Set.copyOf( + handle.createQuery(sql) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .mapTo(String.class) + .list() + ); + } + /** * Retrieve the used segment for a given id if it exists in the metadata store and null otherwise */ From df611d7324ecc57a705c4e6e4771e60d25ad37e6 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 11 Jul 2025 13:27:56 +0530 Subject: [PATCH 2/3] Add more tests and logs --- .../ConcurrentReplaceAndAppendTest.java | 53 ++++++++++++++++++- .../apache/druid/error/ExceptionMatcher.java | 7 +++ .../IndexerSQLMetadataStorageCoordinator.java | 22 +++++--- 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java index 0300a397341d..be2d351f1c56 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.error.ExceptionMatcher; import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskStorageDirTracker; @@ -63,6 +65,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.hamcrest.MatcherAssert; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.After; @@ -1120,7 +1123,7 @@ public void testLockAllocateDayReplaceMonthAllocateAppend() } @Test - public void test_concurrentAppend_toIntervalWithUnusedSegments() + public void test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion() { // Allocate and commit an APPEND segment final SegmentIdWithShardSpec pendingSegment @@ -1154,6 +1157,54 @@ public void test_concurrentAppend_toIntervalWithUnusedSegments() verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02); } + @Test + public void test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries() + { + final int maxAllowedAppends = 10; + final int expectedParitionNum = 0; + String expectedVersion = SEGMENT_V0; + + // Allocate, commit, delete, repeat + for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") { + // Allocate a segment and verify its version and partition number + final SegmentIdWithShardSpec pendingSegment + = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY); + + Assert.assertEquals(expectedVersion, pendingSegment.getVersion()); + Assert.assertEquals(expectedParitionNum, pendingSegment.getShardSpec().getPartitionNum()); + + // Commit the segment and verify its version and partition number + final DataSegment segment = asSegment(pendingSegment); + appendTask.commitAppendSegments(segment); + + Assert.assertEquals(expectedVersion, segment.getVersion()); + Assert.assertEquals(expectedParitionNum, segment.getShardSpec().getPartitionNum()); + + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segment); + verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segment); + + // Mark the segment as unused + getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource()); + verifyIntervalHasUsedSegments(FIRST_OF_JAN_23); + } + + // Verify that the next attempt fails + MatcherAssert.assertThat( + Assert.assertThrows( + ISE.class, + () -> appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY) + ), + ExceptionMatcher.of(ISE.class).expectRootCause( + DruidExceptionMatcher.internalServerError().expectMessageIs( + "Could not allocate segment" + + "[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]" + + " as there are too many clashing unused versions(upto [1970-01-01T00:00:00.000ZSSSSSSSSSS])" + + " in the interval. Kill the old unused versions to proceed." + ) + ) + ); + } + @Nullable private DataSegment findSegmentWith(String version, Map loadSpec, Set segments) { diff --git a/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java b/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java index 31195093254f..5bc16c47f271 100644 --- a/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java +++ b/processing/src/test/java/org/apache/druid/error/ExceptionMatcher.java @@ -19,6 +19,7 @@ package org.apache.druid.error; +import com.google.common.base.Throwables; import org.apache.druid.matchers.DruidMatchers; import org.hamcrest.Description; import org.hamcrest.DiagnosingMatcher; @@ -75,6 +76,12 @@ public ExceptionMatcher expectCause(Matcher causeMatcher) return this; } + public ExceptionMatcher expectRootCause(Matcher causeMatcher) + { + matcherList.add(0, DruidMatchers.fn("rootCause", Throwables::getRootCause, causeMatcher)); + return this; + } + @Override protected boolean matches(Object item, Description mismatchDescription) { diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index b061a12d79d6..a2c5956772f0 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1631,9 +1631,7 @@ private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation( // Iterate until a new non-clashing version is found boolean foundFreshVersion = false; - StringBuilder candidateVersion = new StringBuilder( - allocatedId.getVersion() + PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX - ); + StringBuilder candidateVersion = new StringBuilder(allocatedId.getVersion()); for (int i = 0; i < 10; ++i) { if (unusedSegmentVersions.contains(candidateVersion.toString())) { candidateVersion.append(PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX); @@ -1644,15 +1642,21 @@ private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation( } if (foundFreshVersion) { - return new SegmentIdWithShardSpec( + final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec( allocatedId.getDataSource(), allocatedId.getInterval(), candidateVersion.toString(), allocatedId.getShardSpec() ); + log.info( + "Created new unique pending segment ID[%s] with version[%s] for originally allocated ID[%s].", + uniqueId, candidateVersion.toString(), allocatedId + ); + + return uniqueId; } else { throw InternalServerError.exception( - "Could not allocate segment[%s] as there are too many unused" + "Could not allocate segment[%s] as there are too many clashing unused" + " versions(upto [%s]) in the interval. Kill the old unused versions to proceed.", allocatedId, candidateVersion.toString() ); @@ -1700,7 +1704,7 @@ private SegmentIdWithShardSpec getUniqueIdForSecondaryAllocation( allocatedId.getShardSpec().getPartitionNum(), unusedMaxId.getPartitionNum() + 1 ); - return new SegmentIdWithShardSpec( + final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec( allocatedId.getDataSource(), allocatedId.getInterval(), allocatedId.getVersion(), @@ -1709,6 +1713,12 @@ private SegmentIdWithShardSpec getUniqueIdForSecondaryAllocation( allocatedId.getShardSpec().getNumCorePartitions() ) ); + log.info( + "Created new unique pending segment ID[%s] with partition number[%s] for originally allocated ID[%s].", + uniqueId, maxPartitionNum, allocatedId + ); + + return uniqueId; } @Override From 06edf61c92181781120a2080e6e9527dc983a123 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 11 Jul 2025 16:12:26 +0530 Subject: [PATCH 3/3] Add tests for IndexerSQLMetadataStorageCoordinator --- ...exerSQLMetadataStorageCoordinatorTest.java | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 11dda468a5d5..c9ee0e97fe78 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -23,7 +23,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import org.apache.druid.common.utils.IdUtils; import org.apache.druid.data.input.StringTuple; +import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.error.ExceptionMatcher; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.ObjectMetadata; @@ -80,6 +82,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.skife.jdbi.v2.exceptions.CallbackFailedException; import java.io.IOException; import java.nio.charset.StandardCharsets; @@ -3123,6 +3126,112 @@ public void testNoPendingSegmentsAndOneUsedSegment() Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString()); } + @Test + public void test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion() + { + final String wiki = TestDataSource.WIKI; + final String appendLockVersion = PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND; + final Interval firstOfJan23 = Intervals.of("2023-01-01/P1D"); + + // Allocate and commit an APPEND segment + final String taskAllocator1 = "taskAlloc1"; + final SegmentIdWithShardSpec pendingSegment + = allocatePendingSegmentForAppendTask(wiki, firstOfJan23, taskAllocator1); + + Assert.assertNotNull(pendingSegment); + Assert.assertEquals(appendLockVersion, pendingSegment.getVersion()); + Assert.assertEquals(0, pendingSegment.getShardSpec().getPartitionNum()); + + final DataSegment segmentV01 = asSegment(pendingSegment); + coordinator.commitAppendSegments(Set.of(segmentV01), Map.of(), taskAllocator1, null); + + verifyIntervalHasUsedSegments(wiki, firstOfJan23, segmentV01); + verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segmentV01); + + // Mark the segment as unused with a future update time to avoid race conditions + final DateTime markUnusedTime = DateTimes.nowUtc().plusHours(1); + transactionFactory.inReadWriteDatasourceTransaction( + wiki, + t -> t.markAllSegmentsAsUnused(markUnusedTime) + ); + verifyIntervalHasUsedSegments(wiki, firstOfJan23); + + // Allocate and commit another APPEND segment + final String taskAllocator2 = "taskAlloc2"; + final SegmentIdWithShardSpec pendingSegment2 + = allocatePendingSegmentForAppendTask(wiki, firstOfJan23, taskAllocator2); + + // Verify that the new segment gets a different version + Assert.assertNotNull(pendingSegment2); + Assert.assertEquals(appendLockVersion + "S", pendingSegment2.getVersion()); + Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum()); + + final DataSegment segmentV02 = asSegment(pendingSegment2); + coordinator.commitAppendSegments(Set.of(segmentV02), Map.of(), taskAllocator2, null); + Assert.assertNotEquals(segmentV01, segmentV02); + + verifyIntervalHasUsedSegments(wiki, firstOfJan23, segmentV02); + verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segmentV02); + } + + @Test + public void test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries() + { + final String wiki = TestDataSource.WIKI; + final Interval firstOfJan23 = Intervals.of("2023-01-01/P1D"); + + final int maxAllowedAppends = 10; + final int expectedParitionNum = 0; + + String expectedVersion = DateTimes.EPOCH.toString(); + + // Allocate, commit, delete, repeat + for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") { + // Allocate a segment and verify its version and partition number + final String taskAllocatorId = IdUtils.getRandomId(); + final SegmentIdWithShardSpec pendingSegment + = allocatePendingSegmentForAppendTask(wiki, firstOfJan23, taskAllocatorId); + + Assert.assertNotNull(pendingSegment); + Assert.assertEquals(expectedVersion, pendingSegment.getVersion()); + Assert.assertEquals(expectedParitionNum, pendingSegment.getShardSpec().getPartitionNum()); + + // Commit the segment and verify its version and partition number + final DataSegment segment = asSegment(pendingSegment); + coordinator.commitAppendSegments(Set.of(segment), Map.of(), taskAllocatorId, null); + + Assert.assertEquals(expectedVersion, segment.getVersion()); + Assert.assertEquals(expectedParitionNum, segment.getShardSpec().getPartitionNum()); + + verifyIntervalHasUsedSegments(wiki, firstOfJan23, segment); + verifyIntervalHasVisibleSegments(wiki, firstOfJan23, segment); + + // Mark the segment as unused with a future update time to avoid race conditions + final DateTime markUnusedTime = DateTimes.nowUtc().plusHours(1); + transactionFactory.inReadWriteDatasourceTransaction( + wiki, + t -> t.markAllSegmentsAsUnused(markUnusedTime) + ); + verifyIntervalHasUsedSegments(wiki, firstOfJan23); + } + + // Verify that the next attempt fails + MatcherAssert.assertThat( + Assert.assertThrows( + CallbackFailedException.class, + () -> allocatePendingSegmentForAppendTask(wiki, firstOfJan23, IdUtils.getRandomId()) + ), + ExceptionMatcher.of(CallbackFailedException.class).expectRootCause( + DruidExceptionMatcher.internalServerError().expectMessageIs( + "Could not allocate segment" + + "[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]" + + " as there are too many clashing unused versions(upto [1970-01-01T00:00:00.000ZSSSSSSSSSS])" + + " in the interval. Kill the old unused versions to proceed." + ) + ) + ); + } + @Test public void testDeletePendingSegment() throws InterruptedException { @@ -4231,6 +4340,26 @@ private SegmentIdWithShardSpec allocatePendingSegment( ); } + private SegmentIdWithShardSpec allocatePendingSegmentForAppendTask( + String dataSource, + Interval interval, + String taskAllocatorId + ) + { + return coordinator.allocatePendingSegment( + dataSource, + interval, + true, + new SegmentCreateRequest( + IdUtils.getRandomId(), + null, + PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND, + NumberedPartialShardSpec.instance(), + taskAllocatorId + ) + ); + } + private int insertPendingSegments( String dataSource, List pendingSegments, @@ -4247,4 +4376,43 @@ private void insertUsedSegments(Set segments, Map u { insertUsedSegments(segments, upgradedFromSegmentIdMap, derbyConnectorRule, mapper); } + + private static DataSegment asSegment(SegmentIdWithShardSpec pendingSegment) + { + final SegmentId id = pendingSegment.asSegmentId(); + return new DataSegment( + id, + Map.of(id.toString(), id.toString()), + List.of(), + List.of(), + pendingSegment.getShardSpec(), + null, + 0, + 0 + ); + } + + private void verifyIntervalHasUsedSegments( + String dataSource, + Interval interval, + DataSegment... expectedSegments + ) + { + Assert.assertEquals( + Set.of(expectedSegments), + coordinator.retrieveUsedSegmentsForIntervals(dataSource, List.of(interval), Segments.INCLUDING_OVERSHADOWED) + ); + } + + private void verifyIntervalHasVisibleSegments( + String dataSource, + Interval interval, + DataSegment... expectedSegments + ) + { + Assert.assertEquals( + Set.of(expectedSegments), + coordinator.retrieveUsedSegmentsForIntervals(dataSource, List.of(interval), Segments.ONLY_VISIBLE) + ); + } }