From 9abe3d383877ee3edab7f7ad8f4231354911b674 Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 18 Jun 2024 14:17:34 +0530 Subject: [PATCH 1/4] Optimize unused segment query for segment allocation --- ...TestIndexerMetadataStorageCoordinator.java | 19 ++++++++ .../IndexerMetadataStorageCoordinator.java | 6 +++ .../IndexerSQLMetadataStorageCoordinator.java | 47 +++++++++++++++++-- ...exerSQLMetadataStorageCoordinatorTest.java | 42 +++++++++++++++++ 4 files changed, 110 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 3aceae494c68..3c2cb6537c32 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -117,6 +117,25 @@ public List retrieveUnusedSegmentsForInterval( return retrieveUnusedSegmentsForInterval(dataSource, interval, null, limit, maxUsedStatusLastUpdatedTime); } + @Override + public List retrieveUnusedSegmentsForExactIntervalAndVersion( + String dataSource, + Interval interval, + String version + ) + { + synchronized (unusedSegments) { + return ImmutableList.copyOf( + unusedSegments.stream() + .filter(ds -> !nuked.contains(ds)) + .filter(ds -> dataSource.equals(ds.getDataSource()) + && interval.equals(ds.getInterval()) + && version.equals(ds.getVersion()) + ).iterator() + ); + } + } + @Override public List retrieveUnusedSegmentsForInterval( String dataSource, diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index da6dd9ffd951..724a17818491 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -179,6 +179,12 @@ List retrieveUnusedSegmentsForInterval( @Nullable DateTime maxUsedStatusLastUpdatedTime ); + List retrieveUnusedSegmentsForExactIntervalAndVersion( + String dataSource, + Interval interval, + String version + ); + /** * Mark as unused segments which include ONLY data within the given interval. * diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index b9adcd01e140..88923bb29070 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -249,6 +249,47 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin ); } + @Override + public List retrieveUnusedSegmentsForExactIntervalAndVersion( + String dataSource, + Interval interval, + String version + ) + { + StringBuilder sb = new StringBuilder(); + sb.append(StringUtils.format("SELECT payload FROM %s", dbTables.getSegmentsTable())); + sb.append(" WHERE used = :used AND dataSource = :dataSource AND version = :version"); + sb.append(StringUtils.format(" AND start = :start AND %1$send%1$s = :end", connector.getQuoteString())); + + final List matchingSegments = connector.inReadOnlyTransaction( + (handle, status) -> { + final Query> sql = handle + .createQuery(StringUtils.format( + sb.toString(), + dbTables.getSegmentsTable() + )) + .setFetchSize(connector.getStreamingFetchSize()) + .bind("used", false) + .bind("dataSource", dataSource) + .bind("version", version) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()); + + try (final ResultIterator iterator = + sql.map( + (index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class) + ).iterator() + ) { + return ImmutableList.copyOf(iterator); + } + } + ); + + log.info("Found [%,d] unused segments for datasource[%s] in interval[%s] and version[%s].", + matchingSegments.size(), dataSource, interval, version); + return matchingSegments; + } + @Override public List retrieveUnusedSegmentsForInterval( String dataSource, @@ -1908,12 +1949,10 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval interval, String version) { - List unusedSegments = retrieveUnusedSegmentsForInterval( + List unusedSegments = retrieveUnusedSegmentsForExactIntervalAndVersion( datasource, interval, - ImmutableList.of(version), - null, - null + version ); SegmentIdWithShardSpec unusedMaxId = null; diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 91f3279eeb65..6df4b28bef48 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -3275,4 +3275,46 @@ public void testSegmentIdShouldNotBeReallocated() throws IOException ); Assert.assertNull(coordinator.retrieveSegmentForId(theId.asSegmentId().toString(), true)); } + + @Test + public void testRetrieveUnusedSegmentsForExactIntervalAndVersion() throws Exception + { + DataSegment unusedForDifferentVersion = createSegment( + Intervals.of("2024/2025"), + "v0", + new NumberedShardSpec(0, 0) + ); + DataSegment unusedSegmentForExactIntervalAndVersion = createSegment( + Intervals.of("2024/2025"), + "v1", + new NumberedShardSpec(0, 0) + ); + DataSegment unusedSegmentForDifferentInterval = createSegment( + Intervals.of("2023/2024"), + "v1", + new NumberedShardSpec(0, 0) + ); + coordinator.commitSegments( + ImmutableSet.of( + unusedForDifferentVersion, + unusedSegmentForDifferentInterval, + unusedSegmentForExactIntervalAndVersion + ), + null + ); + coordinator.markSegmentsAsUnusedWithinInterval(DS.WIKI, Intervals.ETERNITY); + + DataSegment usedSegmentForExactIntervalAndVersion = createSegment( + Intervals.of("2024/2025"), + "v1", + new NumberedShardSpec(1, 0) + ); + coordinator.commitSegments(ImmutableSet.of(usedSegmentForExactIntervalAndVersion), null); + + + List unusedSegmentsForIntervalAndVersion = + coordinator.retrieveUnusedSegmentsForExactIntervalAndVersion(DS.WIKI, Intervals.of("2024/2025"), "v1"); + Assert.assertEquals(1, unusedSegmentsForIntervalAndVersion.size()); + Assert.assertEquals(unusedSegmentForExactIntervalAndVersion, unusedSegmentsForIntervalAndVersion.get(0)); + } } From 550278b8362cc9b5948c4a9e0d78b327f64d41ef Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 18 Jun 2024 15:26:00 +0530 Subject: [PATCH 2/4] Address feedback --- ...TestIndexerMetadataStorageCoordinator.java | 19 ------------------- .../IndexerMetadataStorageCoordinator.java | 6 ------ .../IndexerSQLMetadataStorageCoordinator.java | 19 +++++++++---------- 3 files changed, 9 insertions(+), 35 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 3c2cb6537c32..3aceae494c68 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -117,25 +117,6 @@ public List retrieveUnusedSegmentsForInterval( return retrieveUnusedSegmentsForInterval(dataSource, interval, null, limit, maxUsedStatusLastUpdatedTime); } - @Override - public List retrieveUnusedSegmentsForExactIntervalAndVersion( - String dataSource, - Interval interval, - String version - ) - { - synchronized (unusedSegments) { - return ImmutableList.copyOf( - unusedSegments.stream() - .filter(ds -> !nuked.contains(ds)) - .filter(ds -> dataSource.equals(ds.getDataSource()) - && interval.equals(ds.getInterval()) - && version.equals(ds.getVersion()) - ).iterator() - ); - } - } - @Override public List retrieveUnusedSegmentsForInterval( String dataSource, diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 724a17818491..da6dd9ffd951 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -179,12 +179,6 @@ List retrieveUnusedSegmentsForInterval( @Nullable DateTime maxUsedStatusLastUpdatedTime ); - List retrieveUnusedSegmentsForExactIntervalAndVersion( - String dataSource, - Interval interval, - String version - ); - /** * Mark as unused segments which include ONLY data within the given interval. * diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 88923bb29070..3a4e1fb5e41d 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -249,24 +249,23 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin ); } - @Override public List retrieveUnusedSegmentsForExactIntervalAndVersion( String dataSource, Interval interval, String version ) { - StringBuilder sb = new StringBuilder(); - sb.append(StringUtils.format("SELECT payload FROM %s", dbTables.getSegmentsTable())); - sb.append(" WHERE used = :used AND dataSource = :dataSource AND version = :version"); - sb.append(StringUtils.format(" AND start = :start AND %1$send%1$s = :end", connector.getQuoteString())); + final String sql = "SELECT payload FROM %1$s" + + " WHERE used = :used AND dataSource = :dataSource AND version = :version" + + " AND start = :start AND %2$send%2$s = :end"; final List matchingSegments = connector.inReadOnlyTransaction( (handle, status) -> { - final Query> sql = handle + final Query> query = handle .createQuery(StringUtils.format( - sb.toString(), - dbTables.getSegmentsTable() + sql, + dbTables.getSegmentsTable(), + connector.getQuoteString() )) .setFetchSize(connector.getStreamingFetchSize()) .bind("used", false) @@ -276,7 +275,7 @@ public List retrieveUnusedSegmentsForExactIntervalAndVersion( .bind("end", interval.getEnd().toString()); try (final ResultIterator iterator = - sql.map( + query.map( (index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class) ).iterator() ) { @@ -285,7 +284,7 @@ public List retrieveUnusedSegmentsForExactIntervalAndVersion( } ); - log.info("Found [%,d] unused segments for datasource[%s] in interval[%s] and version[%s].", + log.info("Found [%,d] unused segments for datasource[%s] for interval[%s] and version[%s].", matchingSegments.size(), dataSource, interval, version); return matchingSegments; } From 3286b74ea35363ab4602049f6b54e25493b52fe7 Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 18 Jun 2024 16:33:56 +0530 Subject: [PATCH 3/4] Retrieve only ids --- .../IndexerSQLMetadataStorageCoordinator.java | 40 +++++++++---------- ...exerSQLMetadataStorageCoordinatorTest.java | 11 +++-- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 3a4e1fb5e41d..ced639b0c313 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -249,17 +249,17 @@ public List> retrieveUsedSegmentsAndCreatedDates(Strin ); } - public List retrieveUnusedSegmentsForExactIntervalAndVersion( + List retrieveUnusedSegmentIdsForExactIntervalAndVersion( String dataSource, Interval interval, String version ) { - final String sql = "SELECT payload FROM %1$s" + final String sql = "SELECT id FROM %1$s" + " WHERE used = :used AND dataSource = :dataSource AND version = :version" + " AND start = :start AND %2$send%2$s = :end"; - final List matchingSegments = connector.inReadOnlyTransaction( + final List matchingSegments = connector.inReadOnlyTransaction( (handle, status) -> { final Query> query = handle .createQuery(StringUtils.format( @@ -274,17 +274,13 @@ public List retrieveUnusedSegmentsForExactIntervalAndVersion( .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()); - try (final ResultIterator iterator = - query.map( - (index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), DataSegment.class) - ).iterator() - ) { + try (final ResultIterator iterator = query.map((index, r, ctx) -> r.getString(1)).iterator()) { return ImmutableList.copyOf(iterator); } } ); - log.info("Found [%,d] unused segments for datasource[%s] for interval[%s] and version[%s].", + log.debug("Found [%,d] unused segments for datasource[%s] for interval[%s] and version[%s].", matchingSegments.size(), dataSource, interval, version); return matchingSegments; } @@ -1921,7 +1917,7 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat } // If yes, try to compute allocated partition num using the max unused segment shard spec - SegmentIdWithShardSpec unusedMaxId = getUnusedMaxId( + SegmentId unusedMaxId = getUnusedMaxId( allocatedId.getDataSource(), allocatedId.getInterval(), allocatedId.getVersion() @@ -1933,7 +1929,7 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat int maxPartitionNum = Math.max( allocatedId.getShardSpec().getPartitionNum(), - unusedMaxId.getShardSpec().getPartitionNum() + 1 + unusedMaxId.getPartitionNum() + 1 ); return new SegmentIdWithShardSpec( allocatedId.getDataSource(), @@ -1946,23 +1942,25 @@ private SegmentIdWithShardSpec getTrueAllocatedId(SegmentIdWithShardSpec allocat ); } - private SegmentIdWithShardSpec getUnusedMaxId(String datasource, Interval interval, String version) + private SegmentId getUnusedMaxId(String datasource, Interval interval, String version) { - List unusedSegments = retrieveUnusedSegmentsForExactIntervalAndVersion( + List unusedSegmentIds = retrieveUnusedSegmentIdsForExactIntervalAndVersion( datasource, interval, version ); - SegmentIdWithShardSpec unusedMaxId = null; + SegmentId unusedMaxId = null; int maxPartitionNum = -1; - for (DataSegment unusedSegment : unusedSegments) { - if (unusedSegment.getInterval().equals(interval)) { - int partitionNum = unusedSegment.getShardSpec().getPartitionNum(); - if (maxPartitionNum < partitionNum) { - maxPartitionNum = partitionNum; - unusedMaxId = SegmentIdWithShardSpec.fromDataSegment(unusedSegment); - } + for (String id : unusedSegmentIds) { + final SegmentId segmentId = SegmentId.tryParse(datasource, id); + if (segmentId == null) { + continue; + } + int partitionNum = segmentId.getPartitionNum(); + if (maxPartitionNum < partitionNum) { + maxPartitionNum = partitionNum; + unusedMaxId = segmentId; } } return unusedMaxId; diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 6df4b28bef48..20a74e6c026e 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -3312,9 +3312,12 @@ public void testRetrieveUnusedSegmentsForExactIntervalAndVersion() throws Except coordinator.commitSegments(ImmutableSet.of(usedSegmentForExactIntervalAndVersion), null); - List unusedSegmentsForIntervalAndVersion = - coordinator.retrieveUnusedSegmentsForExactIntervalAndVersion(DS.WIKI, Intervals.of("2024/2025"), "v1"); - Assert.assertEquals(1, unusedSegmentsForIntervalAndVersion.size()); - Assert.assertEquals(unusedSegmentForExactIntervalAndVersion, unusedSegmentsForIntervalAndVersion.get(0)); + List unusedSegmentIdsForIntervalAndVersion = + coordinator.retrieveUnusedSegmentIdsForExactIntervalAndVersion(DS.WIKI, Intervals.of("2024/2025"), "v1"); + Assert.assertEquals(1, unusedSegmentIdsForIntervalAndVersion.size()); + Assert.assertEquals( + unusedSegmentForExactIntervalAndVersion.getId().toString(), + unusedSegmentIdsForIntervalAndVersion.get(0) + ); } } From 045017e5ca292ddc92dd2f5aef6a077e2e0b8a2b Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 18 Jun 2024 16:35:37 +0530 Subject: [PATCH 4/4] Better formatting --- .../druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index ced639b0c313..2b9f328a0977 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -256,7 +256,9 @@ List retrieveUnusedSegmentIdsForExactIntervalAndVersion( ) { final String sql = "SELECT id FROM %1$s" - + " WHERE used = :used AND dataSource = :dataSource AND version = :version" + + " WHERE used = :used" + + " AND dataSource = :dataSource" + + " AND version = :version" + " AND start = :start AND %2$send%2$s = :end"; final List matchingSegments = connector.inReadOnlyTransaction(