Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8189b54
Initial support for mark used and unused APIs by versions.
abhishekrb19 Mar 14, 2024
dc869aa
Short circuit only markUnused operation when there are no used segments.
abhishekrb19 Mar 14, 2024
86ce012
Adjust test.
abhishekrb19 Mar 14, 2024
d06ccf5
Merge branch 'fix_mark_used_interval_when_no_segments' into mark_used…
abhishekrb19 Mar 14, 2024
65ab201
javadocs.
abhishekrb19 Mar 14, 2024
c627cbe
* fix
zachjsh Mar 14, 2024
8e7298e
Merge branch 'master' into mark_used_unused_api_versions
abhishekrb19 Mar 15, 2024
f451788
* address review comments
zachjsh Mar 15, 2024
9e29483
Add some tests and cleanup code.
abhishekrb19 Mar 15, 2024
d79a127
Merge branch 'master' into mark_used_unused_api_versions
abhishekrb19 Mar 15, 2024
67e2c7b
* all remove the short-circuit for markUnused api
zachjsh Mar 15, 2024
acda307
Update tests.
abhishekrb19 Mar 15, 2024
45f00fc
Revert "Merge branch 'fix_mark_used_interval_when_no_segments' into m…
abhishekrb19 Mar 15, 2024
2c34335
revert one more change in favor of PR #16127
abhishekrb19 Mar 15, 2024
d848ef1
checkstyle
abhishekrb19 Mar 15, 2024
c44eaa8
adjust nullables and add javadocs.
abhishekrb19 Mar 15, 2024
4a965c9
* add test
zachjsh Mar 15, 2024
22200bd
Merge branch 'pr/16127' into mark_used_unused_api_versions
abhishekrb19 Mar 15, 2024
682e856
Merge branch 'master' into mark_used_unused_api_versions
abhishekrb19 Mar 15, 2024
cbd09cd
Comment fix up.
abhishekrb19 Mar 15, 2024
8cd8200
default method and remove some unnecessary overridden code.
abhishekrb19 Mar 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ public TypeReference<Integer> getReturnTypeReference()
@Override
public Integer perform(Task task, TaskActionToolbox toolbox)
{
int numMarked = toolbox.getIndexerMetadataStorageCoordinator()
.markSegmentsAsUnusedWithinInterval(dataSource, interval);
return numMarked;
return toolbox.getIndexerMetadataStorageCoordinator()
.markSegmentsAsUnusedWithinInterval(dataSource, interval);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,20 @@ public interface SegmentsMetadataManager
*/
int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource);

int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval);
/**
* Marks non-overshadowed unused segments for the given interval as used.
* @return Number of segments updated
*/
default int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval)
{
return markAsUsedNonOvershadowedSegmentsInInterval(dataSource, interval, null);
}

/**
* Marks non-overshadowed unused segments for the given interval and optional list of versions as used.
* @return Number of segments updated
*/
int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, List<String> versions);

/**
* Marks the given segment IDs as "used" only if there are not already overshadowed
Expand Down Expand Up @@ -81,7 +94,22 @@ public interface SegmentsMetadataManager
*/
int markAsUnusedAllSegmentsInDataSource(String dataSource);

int markAsUnusedSegmentsInInterval(String dataSource, Interval interval);
/**
* Marks segments as unused that are fully contained in the specified interval. Segments that are already marked as
* unused are not updated.
* @return Number of segments updated
*/
default int markAsUnusedSegmentsInInterval(String dataSource, Interval interval)
{
return markAsUnusedSegmentsInInterval(dataSource, interval, null);
}

/**
* Marks segments as unused that are fully contained in the specified interval with an optional list of versions.
* Segments that are already marked as unused are not updated.
* @return The number of segments updated
*/
int markAsUnusedSegmentsInInterval(String dataSource, Interval interval, List<String> versions);

int markSegmentsAsUnused(Set<SegmentId> segmentIds);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,21 +654,25 @@ public boolean markSegmentAsUsed(final String segmentId)
@Override
public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String dataSource)
{
return doMarkAsUsedNonOvershadowedSegments(dataSource, null);
return doMarkAsUsedNonOvershadowedSegments(dataSource, null, null);
}

@Override
public int markAsUsedNonOvershadowedSegmentsInInterval(final String dataSource, final Interval interval)
public int markAsUsedNonOvershadowedSegmentsInInterval(
final String dataSource,
final Interval interval,
@Nullable final List<String> versions
)
{
Preconditions.checkNotNull(interval);
return doMarkAsUsedNonOvershadowedSegments(dataSource, interval);
return doMarkAsUsedNonOvershadowedSegments(dataSource, interval, versions);
}

/**
* Implementation for both {@link #markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is null)
* and {@link #markAsUsedNonOvershadowedSegmentsInInterval}.
*/
private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval)
private int doMarkAsUsedNonOvershadowedSegments(
final String dataSourceName,
final @Nullable Interval interval,
final @Nullable List<String> versions
)
{
final List<DataSegment> unusedSegments = new ArrayList<>();
final SegmentTimeline timeline = new SegmentTimeline();
Expand All @@ -682,12 +686,12 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable
interval == null ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval);

try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUsedSegments(dataSourceName, intervals)) {
queryTool.retrieveUsedSegments(dataSourceName, intervals, versions)) {
timeline.addSegments(iterator);
}

try (final CloseableIterator<DataSegment> iterator =
queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null, null, null)) {
queryTool.retrieveUnusedSegments(dataSourceName, intervals, versions, null, null, null, null)) {
while (iterator.hasNext()) {
final DataSegment dataSegment = iterator.next();
timeline.addSegments(Iterators.singletonIterator(dataSegment));
Expand Down Expand Up @@ -796,7 +800,7 @@ private CloseableIterator<DataSegment> retrieveUsedSegmentsOverlappingIntervals(
private int markSegmentsAsUsed(final List<SegmentId> segmentIds)
{
if (segmentIds.isEmpty()) {
log.info("No segments found to update!");
log.info("No segments found to mark as used.");
return 0;
}

Expand Down Expand Up @@ -856,13 +860,18 @@ public int markSegmentsAsUnused(Set<SegmentId> segmentIds)
}

@Override
public int markAsUnusedSegmentsInInterval(String dataSourceName, Interval interval)
public int markAsUnusedSegmentsInInterval(
final String dataSource,
final Interval interval,
@Nullable final List<String> versions
)
{
Preconditions.checkNotNull(interval);
try {
return connector.getDBI().withHandle(
handle ->
SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper)
.markSegmentsUnused(dataSourceName, interval)
.markSegmentsUnused(dataSource, interval, versions)
);
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,24 @@ public CloseableIterator<DataSegment> retrieveUsedSegments(
final String dataSource,
final Collection<Interval> intervals
)
{
return retrieveUsedSegments(dataSource, intervals, null);
}

/**
* Similar to {@link #retrieveUsedSegments}, but with an additional {@code versions} argument. When {@code versions}
* is specified, all used segments in the specified {@code intervals} and {@code versions} are retrieved.
*/
public CloseableIterator<DataSegment> retrieveUsedSegments(
final String dataSource,
final Collection<Interval> intervals,
final List<String> versions
)
{
return retrieveSegments(
dataSource,
intervals,
null,
versions,
IntervalMode.OVERLAPS,
true,
null,
Expand Down Expand Up @@ -314,19 +327,34 @@ public int markSegments(final Collection<SegmentId> segmentIds, final boolean us
/**
* Marks all used segments that are *fully contained by* a particular interval as unused.
*
* @return the number of segments actually modified.
* @return Number of segments updated.
*/
public int markSegmentsUnused(final String dataSource, final Interval interval)
{
return markSegmentsUnused(dataSource, interval, null);
}

/**
* Marks all used segments that are *fully contained by* a particular interval filtered by an optional list of versions
* as unused.
*
* @return Number of segments updated.
*/
public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List<String> versions)
{
if (Intervals.isEternity(interval)) {
return handle
.createStatement(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true",
dbTables.getSegmentsTable()
)
final StringBuilder sb = new StringBuilder();
sb.append(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true",
dbTables.getSegmentsTable()
)
);
appendConditionForVersions(sb, versions);

return handle
.createStatement(sb.toString())
.bind("dataSource", dataSource)
.bind("used", false)
.bind("used_status_last_updated", DateTimes.nowUtc().toString())
Expand All @@ -336,15 +364,19 @@ public int markSegmentsUnused(final String dataSource, final Interval interval)
// Safe to write a WHERE clause with this interval. Note that it is unsafe if the years are different, because
// that means extra characters can sneak in. (Consider a query interval like "2000-01-01/2001-01-01" and a
// segment interval like "20001/20002".)
return handle
.createStatement(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true AND %s",
dbTables.getSegmentsTable(),
IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end")
)
final StringBuilder sb = new StringBuilder();
sb.append(
StringUtils.format(
"UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated "
+ "WHERE dataSource = :dataSource AND used = true AND %s",
dbTables.getSegmentsTable(),
IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end")
)
);
appendConditionForVersions(sb, versions);

return handle
.createStatement(sb.toString())
.bind("dataSource", dataSource)
.bind("used", false)
.bind("start", interval.getStart().toString())
Expand All @@ -358,7 +390,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval)
retrieveSegments(
dataSource,
Collections.singletonList(interval),
null,
versions,
IntervalMode.CONTAINS,
true,
null,
Expand Down Expand Up @@ -680,12 +712,7 @@ private Query<Map<String, Object>> buildSegmentsTableQuery(
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector);
}

if (!CollectionUtils.isNullOrEmpty(versions)) {
final String versionsStr = versions.stream()
.map(version -> "'" + version + "'")
.collect(Collectors.joining(","));
sb.append(StringUtils.format(" AND version IN (%s)", versionsStr));
}
appendConditionForVersions(sb, versions);

// Add the used_status_last_updated time filter only for unused segments when maxUsedStatusLastUpdatedTime is non-null.
final boolean addMaxUsedLastUpdatedTimeFilter = !used && maxUsedStatusLastUpdatedTime != null;
Expand Down Expand Up @@ -834,6 +861,21 @@ private static int computeNumChangedSegments(List<String> segmentIds, int[] segm
return numChangedSegments;
}

private static void appendConditionForVersions(
final StringBuilder sb,
final List<String> versions
)
{
if (CollectionUtils.isNullOrEmpty(versions)) {
return;
}

final String versionsCsv = versions.stream()
.map(version -> "'" + version + "'")
.collect(Collectors.joining(","));
sb.append(StringUtils.format(" AND version IN (%s)", versionsCsv));
}

enum IntervalMode
{
CONTAINS {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,14 @@ public Response markAsUsedNonOvershadowedSegments(
.build();
} else {
SegmentUpdateOperation operation = () -> {

final Interval interval = payload.getInterval();
final List<String> versions = payload.getVersions();
if (interval != null) {
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval);
if (versions != null) {
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval, versions);
} else {
return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval);
}
} else {
final Set<String> segmentIds = payload.getSegmentIds();
if (segmentIds == null || segmentIds.isEmpty()) {
Expand Down Expand Up @@ -266,9 +270,14 @@ public Response markSegmentsAsUnused(
} else {
SegmentUpdateOperation operation = () -> {
final Interval interval = payload.getInterval();
final List<String> versions = payload.getVersions();
final int numUpdatedSegments;
if (interval != null) {
numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval);
if (versions != null) {
numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval, versions);
} else {
numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval);
}
} else {
final Set<SegmentId> segmentIds =
payload.getSegmentIds()
Expand Down Expand Up @@ -995,15 +1004,18 @@ protected static class MarkDataSourceSegmentsPayload
{
private final Interval interval;
private final Set<String> segmentIds;
private final List<String> versions;

@JsonCreator
public MarkDataSourceSegmentsPayload(
@JsonProperty("interval") Interval interval,
@JsonProperty("segmentIds") Set<String> segmentIds
@JsonProperty("segmentIds") Set<String> segmentIds,
@JsonProperty("versions") List<String> versions
)
{
this.interval = interval;
this.segmentIds = segmentIds;
this.versions = versions;
}

@JsonProperty
Expand All @@ -1018,9 +1030,15 @@ public Set<String> getSegmentIds()
return segmentIds;
}

@JsonProperty
public List<String> getVersions()
{
return versions;
}

public boolean isValid()
{
return (interval == null ^ segmentIds == null) && (segmentIds == null || !segmentIds.isEmpty());
return (interval == null ^ segmentIds == null) && (segmentIds == null || !segmentIds.isEmpty()); // fixme
}
}
}
Loading