Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,44 @@ public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(Strin
);
}

List<String> retrieveUnusedSegmentIdsForExactIntervalAndVersion(
String dataSource,
Interval interval,
String version
)
{
final String sql = "SELECT id FROM %1$s"
+ " WHERE used = :used"
+ " AND dataSource = :dataSource"
+ " AND version = :version"
+ " AND start = :start AND %2$send%2$s = :end";

final List<String> matchingSegments = connector.inReadOnlyTransaction(
(handle, status) -> {
final Query<Map<String, Object>> query = handle
.createQuery(StringUtils.format(
sql,
dbTables.getSegmentsTable(),
connector.getQuoteString()
))
.setFetchSize(connector.getStreamingFetchSize())
.bind("used", false)
.bind("dataSource", dataSource)
.bind("version", version)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString());

try (final ResultIterator<String> iterator = query.map((index, r, ctx) -> r.getString(1)).iterator()) {
return ImmutableList.copyOf(iterator);
}
}
);

log.debug("Found [%,d] unused segments for datasource[%s] for interval[%s] and version[%s].",
matchingSegments.size(), dataSource, interval, version);
Comment on lines +285 to +286
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
log.debug("Found [%,d] unused segments for datasource[%s] for interval[%s] and version[%s].",
matchingSegments.size(), dataSource, interval, version);
log.debug(
"Found [%,d] unused segments for datasource[%s], interval[%s] and version[%s].",
matchingSegments.size(), dataSource, interval, version
);

return matchingSegments;
}

@Override
public List<DataSegment> retrieveUnusedSegmentsForInterval(
String dataSource,
Expand Down Expand Up @@ -1881,7 +1919,7 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat
}

// If yes, try to compute allocated partition num using the max unused segment shard spec
SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId(
SegmentId unusedMaxId = getUnusedMaxId(
allocatedId.getDataSource(),
allocatedId.getInterval(),
allocatedId.getVersion()
Expand All @@ -1893,7 +1931,7 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat

int maxPartitionNum = Math.max(
allocatedId.getShardSpec().getPartitionNum(),
unusedMaxId.getShardSpec().getPartitionNum() + 1
unusedMaxId.getPartitionNum() + 1
);
return new SegmentIdWithShardSpec(
allocatedId.getDataSource(),
Expand All @@ -1906,25 +1944,25 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat
);
}

private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval interval, String version)
private SegmentId getUnusedMaxId(String datasource, Interval interval, String version)
{
List<DataSegment> unusedSegments = retrieveUnusedSegmentsForInterval(
List<String> unusedSegmentIds = retrieveUnusedSegmentIdsForExactIntervalAndVersion(
datasource,
interval,
ImmutableList.of(version),
null,
null
version
);

SegmentIdWithShardSpec unusedMaxId = null;
SegmentId unusedMaxId = null;
int maxPartitionNum = -1;
for (DataSegment unusedSegment : unusedSegments) {
if (unusedSegment.getInterval().equals(interval)) {
int partitionNum = unusedSegment.getShardSpec().getPartitionNum();
if (maxPartitionNum < partitionNum) {
maxPartitionNum = partitionNum;
unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment);
}
for (String id : unusedSegmentIds) {
final SegmentId segmentId = SegmentId.tryParse(datasource, id);
if (segmentId == null) {
continue;
}
int partitionNum = segmentId.getPartitionNum();
if (maxPartitionNum < partitionNum) {
maxPartitionNum = partitionNum;
unusedMaxId = segmentId;
}
}
return unusedMaxId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3275,4 +3275,49 @@ public void testSegmentIdShouldNotBeReallocated() throws IOException
);
Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true));
}

@Test
public void testRetrieveUnusedSegmentsForExactIntervalAndVersion() throws Exception
{
DataSegment unusedForDifferentVersion = createSegment(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DataSegment unusedForDifferentVersion = createSegment(
final DataSegment unusedSegmentMay2024V0 = createSegment(

Intervals.of("2024/2025"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use an interval which is easier to use in a name. You may even assign this interval value to a field named Interval may2024 so that you can reuse it in multiple places.

Suggested change
Intervals.of("2024/2025"),
Intervals.of("2024-05/P1M"),

"v0",
new NumberedShardSpec(0, 0)
);
DataSegment unusedSegmentForExactIntervalAndVersion = createSegment(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DataSegment unusedSegmentForExactIntervalAndVersion = createSegment(
final DataSegment unusedSegmentMay2024V1 = createSegment(

Intervals.of("2024/2025"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Intervals.of("2024/2025"),
Intervals.of("2024-05/P1M"),

"v1",
new NumberedShardSpec(0, 0)
);
DataSegment unusedSegmentForDifferentInterval = createSegment(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DataSegment unusedSegmentForDifferentInterval = createSegment(
final DataSegment unusedSegmentYear2024V1 = createSegment(

Intervals.of("2023/2024"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a disjoint interval, a better test would be to verify that a segment in an overlapping (but not identical) interval is not returned.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Intervals.of("2023/2024"),
Intervals.of("2024/P1Y"),

"v1",
new NumberedShardSpec(0, 0)
);
coordinator.commitSegments(
ImmutableSet.of(
unusedForDifferentVersion,
unusedSegmentForDifferentInterval,
unusedSegmentForExactIntervalAndVersion
),
null
);
coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, Intervals.ETERNITY);

DataSegment usedSegmentForExactIntervalAndVersion = createSegment(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DataSegment usedSegmentForExactIntervalAndVersion = createSegment(
final DataSegment usedSegmentMay2024V1 = createSegment(

Intervals.of("2024/2025"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Intervals.of("2024/2025"),
Intervals.of("2024-05/P1M"),

"v1",
new NumberedShardSpec(1, 0)
);
coordinator.commitSegments(ImmutableSet.of(usedSegmentForExactIntervalAndVersion), null);


List<String> unusedSegmentIdsForIntervalAndVersion =
coordinator.retrieveUnusedSegmentIdsForExactIntervalAndVersion(DS.WIKI, Intervals.of("2024/2025"), "v1");
Assert.assertEquals(1, unusedSegmentIdsForIntervalAndVersion.size());
Assert.assertEquals(
unusedSegmentForExactIntervalAndVersion.getId().toString(),
unusedSegmentIdsForIntervalAndVersion.get(0)
);
}
}