From 247c4eb7f3f8bdb5534533e4b54e26d373dc0c95 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 4 Nov 2022 12:07:50 +0530 Subject: [PATCH 1/4] Deserialize only when needed --- .../IndexerSQLMetadataStorageCoordinator.java | 65 +++++++++++++++---- 1 file changed, 53 insertions(+), 12 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 10e95074c558..de207f9150e1 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -75,6 +75,7 @@ import javax.validation.constraints.NotNull; import java.io.IOException; import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -229,25 +230,28 @@ private Set getPendingSegmentsForIntervalWithHandle( { final Set identifiers = new HashSet<>(); - final ResultIterator dbSegments = + final ResultIterator dbSegments = handle.createQuery( StringUtils.format( - "SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start <= :end and %2$send%2$s >= :start", + "SELECT start, %2$send%2$s, payload FROM %1$s " + + "WHERE dataSource = :dataSource " + + "AND start <= :end and %2$send%2$s >= :start", dbTables.getPendingSegmentsTable(), connector.getQuoteString() ) ) .bind("dataSource", dataSource) .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) + .map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r, jsonMapper)) .iterator(); while (dbSegments.hasNext()) { - final byte[] payload = dbSegments.next(); - final SegmentIdWithShardSpec identifier = jsonMapper.readValue(payload, SegmentIdWithShardSpec.class); - - if (interval.overlaps(identifier.getInterval())) { - identifiers.add(identifier); + final PendingSegmentsRecord record = dbSegments.next(); + if (interval.overlaps(record.getInterval())) { + // Deserialize the payload only if this record is eligible + identifiers.add( + jsonMapper.readValue(record.getPayload(), SegmentIdWithShardSpec.class) + ); } } @@ -584,7 +588,7 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( .asBytes() ); - insertToMetastore( + insertPendingSegment( handle, newIdentifier, dataSource, @@ -662,7 +666,7 @@ private SegmentIdWithShardSpec allocatePendingSegment( ); // always insert empty previous sequence id - insertToMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); + insertPendingSegment(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); log.info("Allocated pending segment [%s] for sequence[%s] in DB", newIdentifier, sequenceName); @@ -742,7 +746,7 @@ private static class CheckExistingSegmentIdResult } } - private void insertToMetastore( + private void insertPendingSegment( Handle handle, SegmentIdWithShardSpec newIdentifier, String dataSource, @@ -943,7 +947,7 @@ private SegmentIdWithShardSpec createNewSegment( return new SegmentIdWithShardSpec( dataSource, - overallMaxId.getInterval(), + interval, Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"), partialShardSpec.complete( jsonMapper, @@ -1468,4 +1472,41 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set Date: Sat, 5 Nov 2022 10:32:25 +0530 Subject: [PATCH 2/4] Update query to fetch pending segments --- .../druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index de207f9150e1..5c1b2761a5af 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -235,7 +235,7 @@ private Set getPendingSegmentsForIntervalWithHandle( StringUtils.format( "SELECT start, %2$send%2$s, payload FROM %1$s " + "WHERE dataSource = :dataSource " - + "AND start <= :end and %2$send%2$s >= :start", + + "AND start < :end and %2$send%2$s > :start", dbTables.getPendingSegmentsTable(), connector.getQuoteString() ) ) From e7edd5ce65a350ca918f2244e0acbe41a917e02a Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Mon, 7 Nov 2022 11:54:24 +0530 Subject: [PATCH 3/4] Revert unneeded changes --- .../IndexerSQLMetadataStorageCoordinator.java | 58 ++++--------------- 1 file changed, 12 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 5c1b2761a5af..6f2852ea4cf1 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -75,7 +75,6 @@ import javax.validation.constraints.NotNull; import java.io.IOException; import java.sql.ResultSet; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -230,28 +229,31 @@ private Set getPendingSegmentsForIntervalWithHandle( { final Set identifiers = new HashSet<>(); - final ResultIterator dbSegments = + final ResultIterator dbSegments = handle.createQuery( StringUtils.format( + // This query might fail if the year has a different number of digits + // See https://github.com/apache/druid/pull/11582 for a similar issue + // Using long for these timestamps instead of varchar would give correct time comparisons "SELECT start, %2$send%2$s, payload FROM %1$s " + "WHERE dataSource = :dataSource " + "AND start < :end and %2$send%2$s > :start", - dbTables.getPendingSegmentsTable(), connector.getQuoteString() + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() ) ) .bind("dataSource", dataSource) .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()) - .map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r, jsonMapper)) + .map(ByteArrayMapper.FIRST) .iterator(); while (dbSegments.hasNext()) { - final PendingSegmentsRecord record = dbSegments.next(); - if (interval.overlaps(record.getInterval())) { - // Deserialize the payload only if this record is eligible - identifiers.add( - jsonMapper.readValue(record.getPayload(), SegmentIdWithShardSpec.class) - ); + final byte[] payload = dbSegments.next(); + final SegmentIdWithShardSpec identifier = jsonMapper.readValue(payload, SegmentIdWithShardSpec.class); + + if (interval.overlaps(identifier.getInterval())) { + identifiers.add(identifier); } } @@ -1473,40 +1475,4 @@ public int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set Date: Mon, 7 Nov 2022 12:35:49 +0530 Subject: [PATCH 4/4] Fix query --- .../IndexerSQLMetadataStorageCoordinator.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 6f2852ea4cf1..cc42d77e1b70 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -235,11 +235,8 @@ private Set getPendingSegmentsForIntervalWithHandle( // This query might fail if the year has a different number of digits // See https://github.com/apache/druid/pull/11582 for a similar issue // Using long for these timestamps instead of varchar would give correct time comparisons - "SELECT start, %2$send%2$s, payload FROM %1$s " - + "WHERE dataSource = :dataSource " - + "AND start < :end and %2$send%2$s > :start", - dbTables.getPendingSegmentsTable(), - connector.getQuoteString() + "SELECT payload FROM %1$s WHERE dataSource = :dataSource AND start < :end and %2$send%2$s > :start", + dbTables.getPendingSegmentsTable(), connector.getQuoteString() ) ) .bind("dataSource", dataSource) @@ -590,7 +587,7 @@ private SegmentIdWithShardSpec allocatePendingSegmentWithSegmentLineageCheck( .asBytes() ); - insertPendingSegment( + insertPendingSegmentIntoMetastore( handle, newIdentifier, dataSource, @@ -668,7 +665,7 @@ private SegmentIdWithShardSpec allocatePendingSegment( ); // always insert empty previous sequence id - insertPendingSegment(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); + insertPendingSegmentIntoMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); log.info("Allocated pending segment [%s] for sequence[%s] in DB", newIdentifier, sequenceName); @@ -748,7 +745,7 @@ private static class CheckExistingSegmentIdResult } } - private void insertPendingSegment( + private void insertPendingSegmentIntoMetastore( Handle handle, SegmentIdWithShardSpec newIdentifier, String dataSource,