From 71e9ddec9c5748c577d1e3e5daa44b7c3a184642 Mon Sep 17 00:00:00 2001 From: Amatya Date: Fri, 14 Jun 2024 14:37:06 +0530 Subject: [PATCH 1/3] Fix attempts to publish the same pending segments multiple times --- .../metadata/IndexerSQLMetadataStorageCoordinator.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 2b02f09926b9..414f3f8be94b 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1460,8 +1460,12 @@ int insertPendingSegmentsIntoMetastore( )); final String now = DateTimes.nowUtc().toString(); + final Set processedSegmentIds = new HashSet<>(); for (PendingSegmentRecord pendingSegment : pendingSegments) { final SegmentIdWithShardSpec segmentId = pendingSegment.getId(); + if (processedSegmentIds.contains(segmentId.toString())) { + //continue; + } final Interval interval = segmentId.getInterval(); insertBatch.add() @@ -1479,6 +1483,8 @@ int insertPendingSegmentsIntoMetastore( .bind("payload", jsonMapper.writeValueAsBytes(segmentId)) .bind("task_allocator_id", pendingSegment.getTaskAllocatorId()) .bind("upgraded_from_segment_id", pendingSegment.getUpgradedFromSegmentId()); + + processedSegmentIds.add(segmentId.toString()); } int[] updated = insertBatch.execute(); return Arrays.stream(updated).sum(); From f3982d08cb0655cf3146ff208c7342955ca4b0fc Mon Sep 17 00:00:00 2001 From: Amatya Date: Fri, 14 Jun 2024 14:45:51 +0530 Subject: [PATCH 2/3] Uncomment code --- .../druid/metadata/IndexerSQLMetadataStorageCoordinator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 414f3f8be94b..db0a15266c91 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1464,7 +1464,7 @@ int insertPendingSegmentsIntoMetastore( for (PendingSegmentRecord pendingSegment : pendingSegments) { final SegmentIdWithShardSpec segmentId = pendingSegment.getId(); if (processedSegmentIds.contains(segmentId.toString())) { - //continue; + continue; } final Interval interval = segmentId.getInterval(); From 10dc6f4eee3fd8c9275407eaf54d304eb082db69 Mon Sep 17 00:00:00 2001 From: Amatya Date: Tue, 18 Jun 2024 09:24:43 +0530 Subject: [PATCH 3/3] Add test --- .../IndexerSQLMetadataStorageCoordinator.java | 6 ++-- ...exerSQLMetadataStorageCoordinatorTest.java | 28 +++++++++++++++++++ 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index db0a15266c91..b9adcd01e140 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -1460,10 +1460,10 @@ int insertPendingSegmentsIntoMetastore( )); final String now = DateTimes.nowUtc().toString(); - final Set processedSegmentIds = new HashSet<>(); + final Set processedSegmentIds = new HashSet<>(); for (PendingSegmentRecord pendingSegment : pendingSegments) { final SegmentIdWithShardSpec segmentId = pendingSegment.getId(); - if (processedSegmentIds.contains(segmentId.toString())) { + if (processedSegmentIds.contains(segmentId)) { continue; } final Interval interval = segmentId.getInterval(); @@ -1484,7 +1484,7 @@ int insertPendingSegmentsIntoMetastore( .bind("task_allocator_id", pendingSegment.getTaskAllocatorId()) .bind("upgraded_from_segment_id", pendingSegment.getUpgradedFromSegmentId()); - processedSegmentIds.add(segmentId.toString()); + processedSegmentIds.add(segmentId); } int[] updated = insertBatch.execute(); return Arrays.stream(updated).sum(); diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index fc43d7126fee..91f3279eeb65 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -330,6 +330,34 @@ public void testCommitReplaceSegments() ); } + @Test + public void testDuplicatePendingSegmentEntriesAreNotInserted() + { + final PendingSegmentRecord pendingSegment0 = new PendingSegmentRecord( + new SegmentIdWithShardSpec("foo", Intervals.ETERNITY, "version", new NumberedShardSpec(0, 0)), + "sequenceName0", + "sequencePrevId0", + null, + "taskAllocatorId" + ); + final PendingSegmentRecord pendingSegment1 = new PendingSegmentRecord( + new SegmentIdWithShardSpec("foo", Intervals.ETERNITY, "version", new NumberedShardSpec(1, 0)), + "sequenceName1", + "sequencePrevId1", + null, + "taskAllocatorId" + ); + final int actualInserted = derbyConnector.retryWithHandle( + handle -> coordinator.insertPendingSegmentsIntoMetastore( + handle, + ImmutableList.of(pendingSegment0, pendingSegment0, pendingSegment1, pendingSegment1, pendingSegment1), + "foo", + true + ) + ); + Assert.assertEquals(2, actualInserted); + } + @Test public void testSimpleAnnounce() throws IOException {