diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index b9adcd01e140..2b9f328a0977 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -249,6 +249,44 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin ); } + List retrieveUnusedSegmentIdsForExactIntervalAndVersion( + String dataSource, + Interval interval, + String version + ) + { + final String sql = "SELECT id FROM %1$s" + + " WHERE used = :used" + + " AND dataSource = :dataSource" + + " AND version = :version" + + " AND start = :start AND %2$send%2$s = :end"; + + final List matchingSegments = connector.inReadOnlyTransaction( + (handle, status) -> { + final Query> query = handle + .createQuery(StringUtils.format( + sql, + dbTables.getSegmentsTable(), + connector.getQuoteString() + )) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("used", false) + .bind("dataSource", dataSource) + .bind("version", version) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + + try (final ResultIterator iterator = query.map((index, r, ctx) -> r.getString(1)).iterator()) { + return ImmutableList.copyOf(iterator); + } + } + ); + + log.debug("Found [%,d] unused segments for datasource[%s] for interval[%s] and version[%s].", + matchingSegments.size(), dataSource, interval, version); + return matchingSegments; + } + @Override public List retrieveUnusedSegmentsForInterval( String dataSource, @@ -1881,7 +1919,7 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat } // If yes, try to compute allocated partition num using the max unused segment shard spec - SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId( + SegmentId unusedMaxId = getUnusedMaxId( allocatedId.getDataSource(), allocatedId.getInterval(), allocatedId.getVersion() @@ -1893,7 +1931,7 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat int maxPartitionNum = Math.max( allocatedId.getShardSpec().getPartitionNum(), - unusedMaxId.getShardSpec().getPartitionNum() + 1 + unusedMaxId.getPartitionNum() + 1 ); return new SegmentIdWithShardSpec( allocatedId.getDataSource(), @@ -1906,25 +1944,25 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat ); } - private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval interval, String version) + private SegmentId getUnusedMaxId(String datasource, Interval interval, String version) { - List unusedSegments = retrieveUnusedSegmentsForInterval( + List unusedSegmentIds = retrieveUnusedSegmentIdsForExactIntervalAndVersion( datasource, interval, - ImmutableList.of(version), - null, - null + version ); - SegmentIdWithShardSpec unusedMaxId = null; + SegmentId unusedMaxId = null; int maxPartitionNum = -1; - for (DataSegment unusedSegment : unusedSegments) { - if (unusedSegment.getInterval().equals(interval)) { - int partitionNum = unusedSegment.getShardSpec().getPartitionNum(); - if (maxPartitionNum < partitionNum) { - maxPartitionNum = partitionNum; - unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment); - } + for (String id : unusedSegmentIds) { + final SegmentId segmentId = SegmentId.tryParse(datasource, id); + if (segmentId == null) { + continue; + } + int partitionNum = segmentId.getPartitionNum(); + if (maxPartitionNum < partitionNum) { + maxPartitionNum = partitionNum; + unusedMaxId = segmentId; } } return unusedMaxId; diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 91f3279eeb65..20a74e6c026e 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -3275,4 +3275,49 @@ public void testSegmentIdShouldNotBeReallocated() throws IOException ); Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true)); } + + @Test + public void testRetrieveUnusedSegmentsForExactIntervalAndVersion() throws Exception + { + DataSegment unusedForDifferentVersion = createSegment( + Intervals.of("2024/2025"), + "v0", + new NumberedShardSpec(0, 0) + ); + DataSegment unusedSegmentForExactIntervalAndVersion = createSegment( + Intervals.of("2024/2025"), + "v1", + new NumberedShardSpec(0, 0) + ); + DataSegment unusedSegmentForDifferentInterval = createSegment( + Intervals.of("2023/2024"), + "v1", + new NumberedShardSpec(0, 0) + ); + coordinator.commitSegments( + ImmutableSet.of( + unusedForDifferentVersion, + unusedSegmentForDifferentInterval, + unusedSegmentForExactIntervalAndVersion + ), + null + ); + coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, Intervals.ETERNITY); + + DataSegment usedSegmentForExactIntervalAndVersion = createSegment( + Intervals.of("2024/2025"), + "v1", + new NumberedShardSpec(1, 0) + ); + coordinator.commitSegments(ImmutableSet.of(usedSegmentForExactIntervalAndVersion), null); + + + List unusedSegmentIdsForIntervalAndVersion = + coordinator.retrieveUnusedSegmentIdsForExactIntervalAndVersion(DS.WIKI, Intervals.of("2024/2025"), "v1"); + Assert.assertEquals(1, unusedSegmentIdsForIntervalAndVersion.size()); + Assert.assertEquals( + unusedSegmentForExactIntervalAndVersion.getId().toString(), + unusedSegmentIdsForIntervalAndVersion.get(0) + ); + } }