Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.error.ExceptionMatcher;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskStorageDirTracker;
Expand Down Expand Up @@ -63,6 +65,7 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.MatcherAssert;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
Expand Down Expand Up @@ -1120,7 +1123,7 @@ public void testLockAllocateDayReplaceMonthAllocateAppend()
}

@Test
public void test_concurrentAppend_toIntervalWithUnusedSegments()
public void test_concurrentAppend_toIntervalWithUnusedAppendSegment_createsFreshVersion()
{
// Allocate and commit an APPEND segment
final SegmentIdWithShardSpec pendingSegment
Expand All @@ -1141,8 +1144,10 @@ public void test_concurrentAppend_toIntervalWithUnusedSegments()
// Allocate and commit another APPEND segment
final SegmentIdWithShardSpec pendingSegment2
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);
Assert.assertEquals(SEGMENT_V0, pendingSegment2.getVersion());
Assert.assertEquals(1, pendingSegment2.getShardSpec().getPartitionNum());

// Verify that the new segment gets a different version
Assert.assertEquals(SEGMENT_V0 + "S", pendingSegment2.getVersion());
Assert.assertEquals(0, pendingSegment2.getShardSpec().getPartitionNum());
Comment on lines +1148 to +1150
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would probably also be good to validate the "keeps appending" behavior in the tests as well (i.e. if one is created, marked unused, a new one created, marked unused, created again, marked unused, etc. it should just build mroe and more 'S's)


final DataSegment segmentV02 = asSegment(pendingSegment2);
appendTask.commitAppendSegments(segmentV02);
Expand All @@ -1152,6 +1157,54 @@ public void test_concurrentAppend_toIntervalWithUnusedSegments()
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segmentV02);
}

@Test
public void test_allocateCommitDelete_createsFreshVersion_uptoMaxAllowedRetries()
{
final int maxAllowedAppends = 10;
final int expectedParitionNum = 0;
String expectedVersion = SEGMENT_V0;

// Allocate, commit, delete, repeat
for (int i = 0; i < maxAllowedAppends; ++i, expectedVersion += "S") {
// Allocate a segment and verify its version and partition number
final SegmentIdWithShardSpec pendingSegment
= appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY);

Assert.assertEquals(expectedVersion, pendingSegment.getVersion());
Assert.assertEquals(expectedParitionNum, pendingSegment.getShardSpec().getPartitionNum());

// Commit the segment and verify its version and partition number
final DataSegment segment = asSegment(pendingSegment);
appendTask.commitAppendSegments(segment);

Assert.assertEquals(expectedVersion, segment.getVersion());
Assert.assertEquals(expectedParitionNum, segment.getShardSpec().getPartitionNum());

verifyIntervalHasUsedSegments(FIRST_OF_JAN_23, segment);
verifyIntervalHasVisibleSegments(FIRST_OF_JAN_23, segment);

// Mark the segment as unused
getStorageCoordinator().markAllSegmentsAsUnused(appendTask.getDataSource());
verifyIntervalHasUsedSegments(FIRST_OF_JAN_23);
}

// Verify that the next attempt fails
MatcherAssert.assertThat(
Assert.assertThrows(
ISE.class,
() -> appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(), Granularities.DAY)
),
ExceptionMatcher.of(ISE.class).expectRootCause(
DruidExceptionMatcher.internalServerError().expectMessageIs(
"Could not allocate segment"
+ "[wiki_2023-01-01T00:00:00.000Z_2023-01-02T00:00:00.000Z_1970-01-01T00:00:00.000Z]"
+ " as there are too many clashing unused versions(upto [1970-01-01T00:00:00.000ZSSSSSSSSSS])"
+ " in the interval. Kill the old unused versions to proceed."
)
)
);
}

@Nullable
private DataSegment findSegmentWith(String version, Map<String, Object> loadSpec, Set<DataSegment> segments)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.error;

import com.google.common.base.Throwables;
import org.apache.druid.matchers.DruidMatchers;
import org.hamcrest.Description;
import org.hamcrest.DiagnosingMatcher;
Expand Down Expand Up @@ -75,6 +76,12 @@ public ExceptionMatcher expectCause(Matcher<Throwable> causeMatcher)
return this;
}

public ExceptionMatcher expectRootCause(Matcher<Throwable> causeMatcher)
{
matcherList.add(0, DruidMatchers.fn("rootCause", Throwables::getRootCause, causeMatcher));
return this;
}

@Override
protected boolean matches(Object item, Description mismatchDescription)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.inject.Inject;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InternalServerError;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
Expand Down Expand Up @@ -1419,7 +1420,7 @@ private PendingSegmentRecord createNewPendingSegment(
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
pendingSegmentId = getTrueAllocatedId(transaction, pendingSegmentId);
pendingSegmentId = getUniqueIdForPrimaryAllocation(transaction, pendingSegmentId);
return PendingSegmentRecord.create(
pendingSegmentId,
request.getSequenceName(),
Expand Down Expand Up @@ -1457,7 +1458,7 @@ private PendingSegmentRecord createNewPendingSegment(
)
);
return PendingSegmentRecord.create(
getTrueAllocatedId(transaction, pendingSegmentId),
getUniqueIdForSecondaryAllocation(transaction, pendingSegmentId),
request.getSequenceName(),
request.getPreviousSegmentId(),
null,
Expand Down Expand Up @@ -1562,7 +1563,7 @@ private SegmentIdWithShardSpec createNewPendingSegment(
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
return getTrueAllocatedId(transaction, allocatedId);
return getUniqueIdForPrimaryAllocation(transaction, allocatedId);
} else if (!overallMaxId.getInterval().equals(interval)) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
Expand Down Expand Up @@ -1590,18 +1591,90 @@ private SegmentIdWithShardSpec createNewPendingSegment(
committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
)
);
return getTrueAllocatedId(transaction, allocatedId);
return getUniqueIdForSecondaryAllocation(transaction, allocatedId);
}
}

/**
* Verifies that the allocated id doesn't already exist in the druid_segments table.
* If yes, try to get the max unallocated id considering the unused segments for the datasource, version and interval
* Otherwise, use the same id.
* @param allocatedId The segment allcoted on the basis of used and pending segments
* @return a segment id that isn't already used by other unused segments
* Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
* any existing unused segment. If an unused segment already exists that matches
* the interval and version of the given {@code allocatedId}, a fresh version
* is created by suffixing one or more {@link PendingSegmentRecord#CONCURRENT_APPEND_VERSION_SUFFIX}.
* Such a conflict can happen only if all the segments in this interval created
* by a prior APPEND task were marked as unused.
* <p>
* This method should be called only when allocating the first segment in an interval.
*/
private SegmentIdWithShardSpec getUniqueIdForPrimaryAllocation(
SegmentMetadataTransaction transaction,
SegmentIdWithShardSpec allocatedId
)
{
// Get all the unused segment versions for this datasource and interval
final Set<String> unusedSegmentVersions = transaction.noCacheSql().retrieveUnusedSegmentVersionsWithInterval(
allocatedId.getDataSource(),
allocatedId.getInterval()
);

final String allocatedVersion = allocatedId.getVersion();
if (!unusedSegmentVersions.contains(allocatedVersion)) {
// Nothing to do, this version is new
return allocatedId;
} else if (!PendingSegmentRecord.DEFAULT_VERSION_FOR_CONCURRENT_APPEND.equals(allocatedVersion)) {
// Version clash should never happen for non-APPEND locks
throw DruidException.defensive(
"Cannot allocate segment[%s] as there are already some unused segments"
+ " for version[%s] in this interval.",
allocatedId, allocatedVersion
);
}

// Iterate until a new non-clashing version is found
boolean foundFreshVersion = false;
StringBuilder candidateVersion = new StringBuilder(allocatedId.getVersion());
for (int i = 0; i < 10; ++i) {
if (unusedSegmentVersions.contains(candidateVersion.toString())) {
candidateVersion.append(PendingSegmentRecord.CONCURRENT_APPEND_VERSION_SUFFIX);
} else {
foundFreshVersion = true;
break;
}
}

if (foundFreshVersion) {
final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec(
allocatedId.getDataSource(),
allocatedId.getInterval(),
candidateVersion.toString(),
allocatedId.getShardSpec()
);
log.info(
"Created new unique pending segment ID[%s] with version[%s] for originally allocated ID[%s].",
uniqueId, candidateVersion.toString(), allocatedId
);

return uniqueId;
} else {
throw InternalServerError.exception(
"Could not allocate segment[%s] as there are too many clashing unused"
+ " versions(upto [%s]) in the interval. Kill the old unused versions to proceed.",
allocatedId, candidateVersion.toString()
);
}
}

/**
* Returns a unique {@link SegmentIdWithShardSpec} which does not clash with
* any existing unused segment. If an unused segment already exists that matches
* the interval, version and partition number of the given {@code allocatedId},
* a higher partition number is used. Such a conflict can happen only if some
* segments of the underlying version have been marked as unused while others
* are still used.
* <p>
* This method should not be called when allocating the first segment in an
* interval.
*/
private SegmentIdWithShardSpec getTrueAllocatedId(
private SegmentIdWithShardSpec getUniqueIdForSecondaryAllocation(
SegmentMetadataTransaction transaction,
SegmentIdWithShardSpec allocatedId
)
Expand Down Expand Up @@ -1631,7 +1704,7 @@ private SegmentIdWithShardSpec getTrueAllocatedId(
allocatedId.getShardSpec().getPartitionNum(),
unusedMaxId.getPartitionNum() + 1
);
return new SegmentIdWithShardSpec(
final SegmentIdWithShardSpec uniqueId = new SegmentIdWithShardSpec(
allocatedId.getDataSource(),
allocatedId.getInterval(),
allocatedId.getVersion(),
Expand All @@ -1640,6 +1713,12 @@ private SegmentIdWithShardSpec getTrueAllocatedId(
allocatedId.getShardSpec().getNumCorePartitions()
)
);
log.info(
"Created new unique pending segment ID[%s] with partition number[%s] for originally allocated ID[%s].",
uniqueId, maxPartitionNum, allocatedId
);

return uniqueId;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@
*/
public class PendingSegmentRecord
{
/**
* Default lock version used by concurrent APPEND tasks.
*/
public static final String DEFAULT_VERSION_FOR_CONCURRENT_APPEND = DateTimes.EPOCH.toString();

/**
* Suffix to use to construct fresh segment versions in the event of a clash.
* The chosen character {@code S} is just for visual ease so that two versions
* are not easily confused for each other.
* {@code 1970-01-01T00:00:00.000Z_1} vs {@code 1970-01-01T00:00:00.000ZS_1}.
*/
public static final String CONCURRENT_APPEND_VERSION_SUFFIX = "S";

private final SegmentIdWithShardSpec id;
private final String sequenceName;
private final String sequencePrevId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,29 @@ public List<DataSegment> retrieveUnusedSegmentsWithExactInterval(
return segments.stream().filter(Objects::nonNull).collect(Collectors.toList());
}

/**
* Retrieves the versions of unused segments which are perfectly aligned with
* the given interval.
*/
public Set<String> retrieveUnusedSegmentVersionsWithInterval(String dataSource, Interval interval)
{
final String sql = StringUtils.format(
"SELECT DISTINCT(version) FROM %1$s"
+ " WHERE dataSource = :dataSource AND used = false"
+ " AND %2$send%2$s = :end AND start = :start",
dbTables.getSegmentsTable(),
connector.getQuoteString()
);
return Set.copyOf(
handle.createQuery(sql)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.mapTo(String.class)
.list()
);
}

/**
* Retrieve the used segment for a given id if it exists in the metadata store and null otherwise
*/
Expand Down
Loading
Loading