From b9675c57725f6d48cbe5b4d17b3b3c4f57b3c4fc Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Mon, 9 Sep 2024 22:34:38 +0530 Subject: [PATCH 1/8] Filter tombstone segment from refresh --- .../AbstractSegmentMetadataCache.java | 44 ++++--- .../CoordinatorSegmentMetadataCacheTest.java | 118 ++++++++++------- .../BrokerSegmentMetadataCacheTest.java | 122 ++++++++++-------- 3 files changed, 163 insertions(+), 121 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index 06ce009c8b2b..1e405104dd59 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -431,10 +431,7 @@ public Map getSegmentMetadataSnapshot() @Nullable public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, SegmentId segmentId) { - if (!segmentMetadataInfo.containsKey(datasource)) { - return null; - } - return segmentMetadataInfo.get(datasource).get(segmentId); + return segmentMetadataInfo.getOrDefault(datasource, new ConcurrentSkipListMap<>()).get(segmentId); } /** @@ -458,13 +455,6 @@ public int getTotalSegments() @VisibleForTesting public void addSegment(final DruidServerMetadata server, final DataSegment segment) { - // Skip adding tombstone segment to the cache. These segments lack data or column information. - // Additionally, segment metadata queries, which are not yet implemented for tombstone segments - // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, - // leading to indefinite refresh attempts for these segments. - if (segment.isTombstone()) { - return; - } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking @@ -537,10 +527,6 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme @VisibleForTesting public void removeSegment(final DataSegment segment) { - // tombstone segments are not present in the cache - if (segment.isTombstone()) { - return; - } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { log.debug("Segment [%s] is gone.", segment.getId()); @@ -732,18 +718,34 @@ public Set refreshSegmentsForDataSource(final String dataSource, fina log.debug("Refreshing metadata for datasource[%s].", dataSource); + final Set retVal = new HashSet<>(); + + ConcurrentSkipListMap datasourceSegments = segmentMetadataInfo.get(dataSource); + // this datasource no longer exists, skip refresh + if (datasourceSegments == null) { + return retVal; + } + + // Skip refreshing tombstone segments. These segments lack data or column information. + // Additionally, segment metadata queries, which are not yet implemented for tombstone segments + // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, + // leading to indefinite refresh attempts for these segments. + Set segmentsWithoutTombstone = + StreamSupport.stream(Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY).spliterator(), false).filter( + segment -> + datasourceSegments.containsKey(segment) && + !datasourceSegments.get(segment).getSegment().isTombstone()).collect(Collectors.toSet() + ); + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource); - emitter.emit(builder.setMetric("metadatacache/refresh/count", segments.size())); + emitter.emit(builder.setMetric("metadatacache/refresh/count", segmentsWithoutTombstone.size())); // Segment id string -> SegmentId object. - final Map segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString); + final Map segmentIdMap = Maps.uniqueIndex(segmentsWithoutTombstone, SegmentId::toString); - final Set retVal = new HashSet<>(); - final Sequence sequence = runSegmentMetadataQuery( - Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY) - ); + final Sequence sequence = runSegmentMetadataQuery(segmentsWithoutTombstone); Yielder yielder = Yielders.each(sequence); diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 772a79ae0ad1..63eac95d8bb4 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -32,6 +32,7 @@ import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.InternalQueryConfig; import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -2216,74 +2217,93 @@ protected void coldDatasourceSchemaExec() } @Test - public void testTombstoneSegmentIsNotAdded() throws InterruptedException + public void testTombstoneSegmentIsNotRefreshed() throws IOException { - String datasource = "newSegmentAddTest"; - CountDownLatch addSegmentLatch = new CountDownLatch(1); + String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }"; + + TestHelper.makeJsonMapper(); + InternalQueryConfig internalQueryConfig = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class) + ), + InternalQueryConfig.class + ); + + QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class); + QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( - getQueryLifecycleFactory(walker), + factoryMock, serverView, SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), - new InternalQueryConfig(), + internalQueryConfig, new NoopServiceEmitter(), segmentSchemaCache, backFillQueue, sqlSegmentsMetadataManager, segmentsMetadataManagerConfigSupplier - ) - { - @Override - public void addSegment(final DruidServerMetadata server, final DataSegment segment) - { - super.addSegment(server, segment); - if (datasource.equals(segment.getDataSource())) { - addSegmentLatch.countDown(); - } - } - }; + ); - schema.onLeaderStart(); - schema.awaitInitialization(); + Map queryContext = ImmutableMap.of( + QueryContexts.PRIORITY_KEY, 5, + QueryContexts.BROKER_PARALLEL_MERGE_KEY, false + ); - DataSegment segment = new DataSegment( - datasource, - Intervals.of("2001/2002"), - "1", - Collections.emptyMap(), - Collections.emptyList(), - Collections.emptyList(), - TombstoneShardSpec.INSTANCE, - null, + DataSegment segment = newSegment("test", 0); + DataSegment tombstone = DataSegment.builder() + .dataSource("test") + .interval(Intervals.of("2012-01-01/2012-01-02")) + .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString()) + .shardSpec(new TombstoneShardSpec()) + .loadSpec(Collections.singletonMap( + "type", + DataSegment.TOMBSTONE_LOADSPEC_TYPE + )) + .size(0) + .build(); + + final DruidServer historicalServer = druidServers.stream() + .filter(s -> s.getType().equals(ServerType.HISTORICAL)) + .findAny() + .orElse(null); + + Assert.assertNotNull(historicalServer); + final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata(); + + schema.addSegment(historicalServerMetadata, segment); + schema.addSegment(historicalServerMetadata, tombstone); + + List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId()); + + // This is the query that we expect this method to create. We will be testing that it matches the query generated by the method under test. + SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(segment.getDataSource()), + new MultipleSpecificSegmentSpec( + segmentIterable.stream() + .filter(id -> !id.equals(tombstone.getId())) + .map(SegmentId::toDescriptor) + .collect(Collectors.toList()) + ), + new AllColumnIncluderator(), + false, + queryContext, + EnumSet.of(SegmentMetadataQuery.AnalysisType.AGGREGATORS), + false, null, - 0 + null ); - Assert.assertEquals(6, schema.getTotalSegments()); + EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); + // This is the mat of the test, making sure that the query created by the method under test matches the expected query, specifically the operator configured context + EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)) + .andReturn(QueryResponse.withEmptyContext(Sequences.empty())); - serverView.addSegment(segment, ServerType.HISTORICAL); - Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(0, addSegmentLatch.getCount()); + EasyMock.replay(factoryMock, lifecycleMock); - Assert.assertEquals(6, schema.getTotalSegments()); - List metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); - serverView.removeSegment(segment, ServerType.HISTORICAL); - Assert.assertEquals(6, schema.getTotalSegments()); - metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + EasyMock.verify(factoryMock, lifecycleMock); } private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index 10504ca49d55..2b17288545d8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -37,6 +37,7 @@ import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.NoopCoordinatorClient; import org.apache.druid.data.input.InputRow; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; @@ -1139,71 +1140,90 @@ public void testNoDatasourceSchemaWhenNoSegmentMetadata() throws InterruptedExce } @Test - public void testTombstoneSegmentIsNotAdded() throws InterruptedException + public void testTombstoneSegmentIsNotRefreshed() throws IOException { - String datasource = "newSegmentAddTest"; - CountDownLatch addSegmentLatch = new CountDownLatch(1); + String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }"; + + TestHelper.makeJsonMapper(); + InternalQueryConfig internalQueryConfig = MAPPER.readValue( + MAPPER.writeValueAsString( + MAPPER.readValue(brokerInternalQueryConfigJson, InternalQueryConfig.class) + ), + InternalQueryConfig.class + ); + + QueryLifecycleFactory factoryMock = EasyMock.createMock(QueryLifecycleFactory.class); + QueryLifecycle lifecycleMock = EasyMock.createMock(QueryLifecycle.class); + BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( - CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + factoryMock, serverView, - BrokerSegmentMetadataCacheConfig.create(), + SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), - new InternalQueryConfig(), + internalQueryConfig, new NoopServiceEmitter(), new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), new NoopCoordinatorClient(), CentralizedDatasourceSchemaConfig.create() - ) - { - @Override - public void addSegment(final DruidServerMetadata server, final DataSegment segment) - { - super.addSegment(server, segment); - if (datasource.equals(segment.getDataSource())) { - addSegmentLatch.countDown(); - } - } - }; + ); - schema.start(); - schema.awaitInitialization(); + Map queryContext = ImmutableMap.of( + QueryContexts.PRIORITY_KEY, 5, + QueryContexts.BROKER_PARALLEL_MERGE_KEY, false + ); - DataSegment segment = new DataSegment( - datasource, - Intervals.of("2001/2002"), - "1", - Collections.emptyMap(), - Collections.emptyList(), - Collections.emptyList(), - TombstoneShardSpec.INSTANCE, - null, + DataSegment segment = newSegment("test", 0); + DataSegment tombstone = DataSegment.builder() + .dataSource("test") + .interval(Intervals.of("2012-01-01/2012-01-02")) + .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString()) + .shardSpec(new TombstoneShardSpec()) + .loadSpec(Collections.singletonMap( + "type", + DataSegment.TOMBSTONE_LOADSPEC_TYPE + )) + .size(0) + .build(); + + final ImmutableDruidServer historicalServer = druidServers.stream() + .filter(s -> s.getType().equals(ServerType.HISTORICAL)) + .findAny() + .orElse(null); + + Assert.assertNotNull(historicalServer); + final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata(); + + schema.addSegment(historicalServerMetadata, segment); + schema.addSegment(historicalServerMetadata, tombstone); + + List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId()); + + SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(segment.getDataSource()), + new MultipleSpecificSegmentSpec( + segmentIterable.stream() + .filter(id -> !id.equals(tombstone.getId())) + .map(SegmentId::toDescriptor) + .collect(Collectors.toList()) + ), + new AllColumnIncluderator(), + false, + queryContext, + EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), + false, null, - 0 + null ); - Assert.assertEquals(6, schema.getTotalSegments()); + EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); + // This is the mat of the test, making sure that the query created by the method under test matches the expected query, specifically the operator configured context + EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)) + .andReturn(QueryResponse.withEmptyContext(Sequences.empty())); + + EasyMock.replay(factoryMock, lifecycleMock); - serverView.addSegment(segment, ServerType.HISTORICAL); - Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); - Assert.assertEquals(0, addSegmentLatch.getCount()); + schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); - Assert.assertEquals(6, schema.getTotalSegments()); - List metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); - - serverView.removeSegment(segment, ServerType.HISTORICAL); - Assert.assertEquals(6, schema.getTotalSegments()); - metadatas = schema - .getSegmentMetadataSnapshot() - .values() - .stream() - .filter(metadata -> datasource.equals(metadata.getSegment().getDataSource())) - .collect(Collectors.toList()); - Assert.assertEquals(0, metadatas.size()); + EasyMock.verify(factoryMock, lifecycleMock); } } From f75d9b6d5b7d2932bfa66d0f2aa78518c4c4ce9a Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Mon, 9 Sep 2024 23:26:33 +0530 Subject: [PATCH 2/8] minor change --- .../metadata/AbstractSegmentMetadataCache.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index 1e405104dd59..ea09fc800159 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -731,11 +731,12 @@ public Set refreshSegmentsForDataSource(final String dataSource, fina // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, // leading to indefinite refresh attempts for these segments. Set segmentsWithoutTombstone = - StreamSupport.stream(Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY).spliterator(), false).filter( - segment -> - datasourceSegments.containsKey(segment) && - !datasourceSegments.get(segment).getSegment().isTombstone()).collect(Collectors.toSet() - ); + StreamSupport.stream(Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY).spliterator(), false) + .filter( + segment -> + datasourceSegments.containsKey(segment) && + !datasourceSegments.get(segment).getSegment().isTombstone()).collect(Collectors.toSet() + ); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource); From 8ada682c1a08c9c82d8f3bdadce9db2ef3c72b40 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Wed, 11 Sep 2024 11:26:55 +0530 Subject: [PATCH 3/8] Review comments --- .../AbstractSegmentMetadataCache.java | 51 ++++++++++--------- .../CoordinatorSegmentMetadataCache.java | 2 +- .../CoordinatorSegmentMetadataCacheTest.java | 10 ++-- .../BrokerSegmentMetadataCacheTest.java | 7 ++- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index 473cd1ed572a..df13001fc7ba 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -84,6 +84,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; /** @@ -447,18 +448,8 @@ public Iterator iterateSegmentMetadata() @Nullable public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, SegmentId segmentId) { -<<<<<<< HEAD - return segmentMetadataInfo.getOrDefault(datasource, new ConcurrentSkipListMap<>()).get(segmentId); -======= - final ConcurrentSkipListMap dataSourceMap = - segmentMetadataInfo.get(datasource); - - if (dataSourceMap == null) { - return null; - } else { - return dataSourceMap.get(segmentId); - } ->>>>>>> upstream/master + ConcurrentSkipListMap metadata = segmentMetadataInfo.get(datasource); + return metadata == null ? null : metadata.get(segmentId); } /** @@ -729,6 +720,19 @@ private long recomputeIsRealtime(ImmutableSet servers) return historicalServer.isPresent() ? 0 : 1; } + private Stream getTombstoneFilteredStream( + final Set segments, + final ConcurrentSkipListMap datasourceSegments + ) + { + return StreamSupport.stream(Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY).spliterator(), false) + .filter( + segment -> + datasourceSegments.containsKey(segment) && + !datasourceSegments.get(segment).getSegment().isTombstone() + ); + } + /** * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of * segments actually refreshed, which may be a subset of the asked-for set. @@ -753,28 +757,29 @@ public Set refreshSegmentsForDataSource(final String dataSource, fina return retVal; } + long count = getTombstoneFilteredStream(segments, datasourceSegments).count(); + + if (count == 0) { + return retVal; + } + // Skip refreshing tombstone segments. These segments lack data or column information. // Additionally, segment metadata queries, which are not yet implemented for tombstone segments // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, // leading to indefinite refresh attempts for these segments. - Set segmentsWithoutTombstone = - StreamSupport.stream(Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY).spliterator(), false) - .filter( - segment -> - datasourceSegments.containsKey(segment) && - !datasourceSegments.get(segment).getSegment().isTombstone()).collect(Collectors.toSet() - ); + Iterable segmentsWithoutTombstone = + () -> getTombstoneFilteredStream(segments, datasourceSegments).iterator(); + + logSegmentsToRefresh(dataSource, segmentsWithoutTombstone); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource); - emitter.emit(builder.setMetric("metadatacache/refresh/count", segmentsWithoutTombstone.size())); + emitter.emit(builder.setMetric("metadatacache/refresh/count", count)); // Segment id string -> SegmentId object. final Map segmentIdMap = Maps.uniqueIndex(segmentsWithoutTombstone, SegmentId::toString); - logSegmentsToRefresh(dataSource, segmentsWithoutTombstone); - final Sequence sequence = runSegmentMetadataQuery(segmentsWithoutTombstone); Yielder yielder = Yielders.each(sequence); @@ -820,7 +825,7 @@ public Set refreshSegmentsForDataSource(final String dataSource, fina /** * Log the segment details for a datasource to be refreshed for debugging purpose. */ - void logSegmentsToRefresh(String dataSource, Set ids) + void logSegmentsToRefresh(String dataSource, Iterable ids) { // no-op } diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 0ed81f6c1e8c..c9376ecdb665 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -531,7 +531,7 @@ public void refresh(final Set segmentsToRefresh, final Set da } @Override - void logSegmentsToRefresh(String dataSource, Set ids) + void logSegmentsToRefresh(String dataSource, Iterable ids) { log.info("Logging a sample of 5 segments [%s] to be refreshed for datasource [%s]", Iterables.limit(ids, 5), dataSource); } diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 6a751fed4246..121528036d13 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -2280,7 +2280,6 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId()); - // This is the query that we expect this method to create. We will be testing that it matches the query generated by the method under test. SegmentMetadataQuery expectedMetadataQuery = new SegmentMetadataQuery( new TableDataSource(segment.getDataSource()), new MultipleSpecificSegmentSpec( @@ -2299,15 +2298,20 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException ); EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); - // This is the mat of the test, making sure that the query created by the method under test matches the expected query, specifically the operator configured context EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)) - .andReturn(QueryResponse.withEmptyContext(Sequences.empty())); + .andReturn(QueryResponse.withEmptyContext(Sequences.empty())).once(); EasyMock.replay(factoryMock, lifecycleMock); schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); EasyMock.verify(factoryMock, lifecycleMock); + + // refresh only the tombstone segment + segmentIterable = ImmutableList.of(tombstone.getId()); + + // make sure this call doesn't trigger refresh, as the mocks expect only a single call to runSimple + schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); } private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index 286ebc13b3cf..c0adedcf33ff 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -1216,7 +1216,6 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException ); EasyMock.expect(factoryMock.factorize()).andReturn(lifecycleMock).once(); - // This is the mat of the test, making sure that the query created by the method under test matches the expected query, specifically the operator configured context EasyMock.expect(lifecycleMock.runSimple(expectedMetadataQuery, AllowAllAuthenticator.ALLOW_ALL_RESULT, Access.OK)) .andReturn(QueryResponse.withEmptyContext(Sequences.empty())); @@ -1225,5 +1224,11 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); EasyMock.verify(factoryMock, lifecycleMock); + + // refresh only the tombstone segment + segmentIterable = ImmutableList.of(tombstone.getId()); + + // make sure this call doesn't trigger refresh, as the mocks expect only a single call to runSimple + schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); } } From a91d597d3e0147d063a1a63ed0b7efe1ace71087 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Wed, 11 Sep 2024 11:58:38 +0530 Subject: [PATCH 4/8] Revert some change --- .../segment/metadata/AbstractSegmentMetadataCache.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index df13001fc7ba..19819b331e0a 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -448,8 +448,14 @@ public Iterator iterateSegmentMetadata() @Nullable public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, SegmentId segmentId) { - ConcurrentSkipListMap metadata = segmentMetadataInfo.get(datasource); - return metadata == null ? null : metadata.get(segmentId); + final ConcurrentSkipListMap dataSourceMap = + segmentMetadataInfo.get(datasource); + + if (dataSourceMap == null) { + return null; + } else { + return dataSourceMap.get(segmentId); + } } /** From 8e764a2bea8449ba456ef5f00f228f6c12273278 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Wed, 11 Sep 2024 23:23:50 +0530 Subject: [PATCH 5/8] Revert changes to filter tombstone before refresh --- .../AbstractSegmentMetadataCache.java | 62 +++++++------------ 1 file changed, 21 insertions(+), 41 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index 19819b331e0a..d918ec5e3f29 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -84,7 +84,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.stream.StreamSupport; /** @@ -479,6 +478,13 @@ public int getTotalSegments() @VisibleForTesting public void addSegment(final DruidServerMetadata server, final DataSegment segment) { + // Skip adding tombstone segment to the cache. These segments lack data or column information. + // Additionally, segment metadata queries, which are not yet implemented for tombstone segments + // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, + // leading to indefinite refresh attempts for these segments. + if (segment.isTombstone()) { + return; + } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking @@ -551,6 +557,10 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme @VisibleForTesting public void removeSegment(final DataSegment segment) { + // tombstone segments are not present in the cache + if (segment.isTombstone()) { + return; + } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { log.debug("Segment [%s] is gone.", segment.getId()); @@ -726,19 +736,6 @@ private long recomputeIsRealtime(ImmutableSet servers) return historicalServer.isPresent() ? 0 : 1; } - private Stream getTombstoneFilteredStream( - final Set segments, - final ConcurrentSkipListMap datasourceSegments - ) - { - return StreamSupport.stream(Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY).spliterator(), false) - .filter( - segment -> - datasourceSegments.containsKey(segment) && - !datasourceSegments.get(segment).getSegment().isTombstone() - ); - } - /** * Attempt to refresh "segmentSignatures" for a set of segments for a particular dataSource. Returns the set of * segments actually refreshed, which may be a subset of the asked-for set. @@ -755,38 +752,21 @@ public Set refreshSegmentsForDataSource(final String dataSource, fina log.debug("Refreshing metadata for datasource[%s].", dataSource); - final Set retVal = new HashSet<>(); - - ConcurrentSkipListMap datasourceSegments = segmentMetadataInfo.get(dataSource); - // this datasource no longer exists, skip refresh - if (datasourceSegments == null) { - return retVal; - } - - long count = getTombstoneFilteredStream(segments, datasourceSegments).count(); - - if (count == 0) { - return retVal; - } - - // Skip refreshing tombstone segments. These segments lack data or column information. - // Additionally, segment metadata queries, which are not yet implemented for tombstone segments - // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, - // leading to indefinite refresh attempts for these segments. - Iterable segmentsWithoutTombstone = - () -> getTombstoneFilteredStream(segments, datasourceSegments).iterator(); - - logSegmentsToRefresh(dataSource, segmentsWithoutTombstone); - final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource); - emitter.emit(builder.setMetric("metadatacache/refresh/count", count)); + emitter.emit(builder.setMetric("metadatacache/refresh/count", segments.size())); // Segment id string -> SegmentId object. - final Map segmentIdMap = Maps.uniqueIndex(segmentsWithoutTombstone, SegmentId::toString); + final Map segmentIdMap = Maps.uniqueIndex(segments, SegmentId::toString); + + final Set retVal = new HashSet<>(); + + logSegmentsToRefresh(dataSource, segments); - final Sequence sequence = runSegmentMetadataQuery(segmentsWithoutTombstone); + final Sequence sequence = runSegmentMetadataQuery( + Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY) + ); Yielder yielder = Yielders.each(sequence); @@ -831,7 +811,7 @@ public Set refreshSegmentsForDataSource(final String dataSource, fina /** * Log the segment details for a datasource to be refreshed for debugging purpose. */ - void logSegmentsToRefresh(String dataSource, Iterable ids) + void logSegmentsToRefresh(String dataSource, Set ids) { // no-op } From 182bbe8b2ce4ee1592896ecf9ab1f9aabccfbca9 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 12 Sep 2024 01:32:50 +0530 Subject: [PATCH 6/8] Instead of filtering tombstone segment, skip adding them for refresh --- .../AbstractSegmentMetadataCache.java | 24 ++++++++-------- .../CoordinatorSegmentMetadataCache.java | 28 +++++++++++++++---- .../CoordinatorSegmentMetadataCacheTest.java | 24 ++++++++++++---- .../BrokerSegmentMetadataCacheTest.java | 24 ++++++++++++---- 4 files changed, 71 insertions(+), 29 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java index d918ec5e3f29..99d965ec643e 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/AbstractSegmentMetadataCache.java @@ -102,6 +102,13 @@ *

* This class has an abstract method {@link #refresh(Set, Set)} which the child class must override * with the logic to build and cache table schema. + *

+ * Note on handling tombstone segments: + * These segments lack data or column information. + * Additionally, segment metadata queries, which are not yet implemented for tombstone segments + * (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, + * leading to indefinite refresh attempts for these segments. + * Therefore, these segments are never added to the set of segments being refreshed. * * @param The type of information associated with the data source, which must extend {@link DataSourceInformation}. */ @@ -478,13 +485,6 @@ public int getTotalSegments() @VisibleForTesting public void addSegment(final DruidServerMetadata server, final DataSegment segment) { - // Skip adding tombstone segment to the cache. These segments lack data or column information. - // Additionally, segment metadata queries, which are not yet implemented for tombstone segments - // (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones, - // leading to indefinite refresh attempts for these segments. - if (segment.isTombstone()) { - return; - } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { // someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking @@ -511,7 +511,11 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme segmentMetadata = AvailableSegmentMetadata .builder(segment, isRealtime, ImmutableSet.of(server), null, DEFAULT_NUM_ROWS) // Added without needing a refresh .build(); - markSegmentAsNeedRefresh(segment.getId()); + if (segment.isTombstone()) { + log.debug("Skipping refresh for tombstone segment."); + } else { + markSegmentAsNeedRefresh(segment.getId()); + } if (!server.isSegmentReplicationTarget()) { log.debug("Added new mutable segment [%s].", segment.getId()); markSegmentAsMutable(segment.getId()); @@ -557,10 +561,6 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme @VisibleForTesting public void removeSegment(final DataSegment segment) { - // tombstone segments are not present in the cache - if (segment.isTombstone()) { - return; - } // Get lock first so that we won't wait in ConcurrentMap.compute(). synchronized (lock) { log.debug("Segment [%s] is gone.", segment.getId()); diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index c9376ecdb665..df9c24fea880 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -375,8 +375,13 @@ public Iterator iterateSegmentMetadata() .build(); } else { // mark it for refresh, however, this case shouldn't arise by design - markSegmentAsNeedRefresh(segmentId); - log.debug("SchemaMetadata for segmentId[%s] is absent.", segmentId); + log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); + + if (availableSegmentMetadata.getSegment().isTombstone()) { + log.debug("Skipping refresh for tombstone segment."); + } else { + markSegmentAsNeedRefresh(segmentId); + } return availableSegmentMetadata; } } @@ -404,8 +409,13 @@ public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, S .build(); } else { // mark it for refresh, however, this case shouldn't arise by design - markSegmentAsNeedRefresh(segmentId); log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); + + if (availableSegmentMetadata.getSegment().isTombstone()) { + log.debug("Skipping refresh for tombstone segment."); + } else { + markSegmentAsNeedRefresh(segmentId); + } } return availableSegmentMetadata; } @@ -531,7 +541,7 @@ public void refresh(final Set segmentsToRefresh, final Set da } @Override - void logSegmentsToRefresh(String dataSource, Iterable ids) + void logSegmentsToRefresh(String dataSource, Set ids) { log.info("Logging a sample of 5 segments [%s] to be refreshed for datasource [%s]", Iterables.limit(ids, 5), dataSource); } @@ -686,15 +696,21 @@ public RowSignature buildDataSourceRowSignature(final String dataSource) final Map columnTypes = new LinkedHashMap<>(); if (segmentsMap != null && !segmentsMap.isEmpty()) { - for (SegmentId segmentId : segmentsMap.keySet()) { + for (Map.Entry entry : segmentsMap.entrySet()) { + SegmentId segmentId = entry.getKey(); Optional optionalSchema = segmentSchemaCache.getSchemaForSegment(segmentId); if (optionalSchema.isPresent()) { RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); mergeRowSignature(columnTypes, rowSignature); } else { // mark it for refresh, however, this case shouldn't arise by design - markSegmentAsNeedRefresh(segmentId); log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); + + if (entry.getValue().getSegment().isTombstone()) { + log.debug("Skipping refresh for tombstone segment."); + } else { + markSegmentAsNeedRefresh(segmentId); + } } } } else { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 121528036d13..e897263aa15a 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -2221,7 +2221,7 @@ protected void coldDatasourceSchemaExec() } @Test - public void testTombstoneSegmentIsNotRefreshed() throws IOException + public void testTombstoneSegmentIsNotRefreshed() throws IOException, InterruptedException { String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }"; @@ -2277,6 +2277,7 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException schema.addSegment(historicalServerMetadata, segment); schema.addSegment(historicalServerMetadata, tombstone); + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId()); @@ -2303,15 +2304,26 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException EasyMock.replay(factoryMock, lifecycleMock); - schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); + schema.refresh(Collections.singleton(segment.getId()), Collections.singleton("test")); + // verify that metadata query is not issued for tombstone segment EasyMock.verify(factoryMock, lifecycleMock); - // refresh only the tombstone segment - segmentIterable = ImmutableList.of(tombstone.getId()); + // Verify that datasource schema building logic doesn't mark the tombstone segment for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); - // make sure this call doesn't trigger refresh, as the mocks expect only a single call to runSimple - schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); + AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId()); + Assert.assertNotNull(availableSegmentMetadata); + // fetching metadata for tombstone segment shouldn't mark it for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + Set metadatas = new HashSet<>(); + schema.iterateSegmentMetadata().forEachRemaining(metadatas::add); + + Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count()); + + // iterating over entire metadata doesn't cause tombstone to be marked for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); } private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index c0adedcf33ff..b613c602f633 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -1195,6 +1195,7 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException schema.addSegment(historicalServerMetadata, segment); schema.addSegment(historicalServerMetadata, tombstone); + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); List segmentIterable = ImmutableList.of(segment.getId(), tombstone.getId()); @@ -1221,14 +1222,27 @@ public void testTombstoneSegmentIsNotRefreshed() throws IOException EasyMock.replay(factoryMock, lifecycleMock); - schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); + Set segmentsToRefresh = new HashSet<>(); + segmentsToRefresh.add(segment.getId()); + schema.refresh(segmentsToRefresh, Collections.singleton("test")); + // verify that metadata is not issued for tombstone segment EasyMock.verify(factoryMock, lifecycleMock); - // refresh only the tombstone segment - segmentIterable = ImmutableList.of(tombstone.getId()); + // Verify that datasource schema building logic doesn't mark the tombstone segment for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); - // make sure this call doesn't trigger refresh, as the mocks expect only a single call to runSimple - schema.refreshSegmentsForDataSource("test", new HashSet<>(segmentIterable)); + AvailableSegmentMetadata availableSegmentMetadata = schema.getAvailableSegmentMetadata("test", tombstone.getId()); + Assert.assertNotNull(availableSegmentMetadata); + // fetching metadata for tombstone segment shouldn't mark it for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); + + Set metadatas = new HashSet<>(); + schema.iterateSegmentMetadata().forEachRemaining(metadatas::add); + + Assert.assertEquals(1, metadatas.stream().filter(metadata -> metadata.getSegment().isTombstone()).count()); + + // iterating over entire metadata doesn't cause tombstone to be marked for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(tombstone.getId())); } } From ddff99284278ce95190b8943eceaf738859e51c9 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 12 Sep 2024 14:20:46 +0530 Subject: [PATCH 7/8] add a method to mark segment for refresh if needed --- .../CoordinatorSegmentMetadataCache.java | 62 +++++++++---------- .../CoordinatorSegmentMetadataCacheTest.java | 21 +++++++ 2 files changed, 52 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 26a54e3f271c..24489e60acdc 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -374,14 +374,7 @@ public Iterator iterateSegmentMetadata() .withNumRows(metadata.get().getNumRows()) .build(); } else { - // mark it for refresh, however, this case shouldn't arise by design - log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); - - if (availableSegmentMetadata.getSegment().isTombstone()) { - log.debug("Skipping refresh for tombstone segment."); - } else { - markSegmentAsNeedRefresh(segmentId); - } + markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment()); return availableSegmentMetadata; } } @@ -408,14 +401,7 @@ public AvailableSegmentMetadata getAvailableSegmentMetadata(String datasource, S .withNumRows(metadata.get().getNumRows()) .build(); } else { - // mark it for refresh, however, this case shouldn't arise by design - log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); - - if (availableSegmentMetadata.getSegment().isTombstone()) { - log.debug("Skipping refresh for tombstone segment."); - } else { - markSegmentAsNeedRefresh(segmentId); - } + markSegmentForRefreshIfNeeded(availableSegmentMetadata.getSegment()); } return availableSegmentMetadata; } @@ -703,21 +689,7 @@ public RowSignature buildDataSourceRowSignature(final String dataSource) RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); mergeRowSignature(columnTypes, rowSignature); } else { - // mark it for refresh, however, this case shouldn't arise by design - log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); - - if (entry.getValue().getSegment().isTombstone()) { - log.debug("Skipping refresh for tombstone segment."); - } else { - ImmutableDruidDataSource druidDataSource = - sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource()); - - if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) { - // mark it for refresh only if it is used - // however, this case shouldn't arise by design - markSegmentAsNeedRefresh(segmentId); - } - } + markSegmentForRefreshIfNeeded(entry.getValue().getSegment()); } } } else { @@ -892,4 +864,32 @@ Optional mergeOrCreateRowSignature( return Optional.empty(); } } + + /** + * A segment schema can go missing. To ensure smooth functioning, segment is marked for refresh. + * It need not be refreshed in the following scenarios: + * - Tombstone segment, since they do not have any schema. + * - Unused segment which hasn't been yet removed from the cache. + * Any other scenario needs investigation. + */ + private void markSegmentForRefreshIfNeeded(DataSegment segment) + { + SegmentId id = segment.getId(); + + log.debug("SchemaMetadata for segmentId [%s] is absent.", id); + + if (segment.isTombstone()) { + log.debug("Skipping refresh for tombstone segment [%s].", id); + return; + } + + ImmutableDruidDataSource druidDataSource = + sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segment.getDataSource()); + + if (druidDataSource != null && druidDataSource.getSegment(id) != null) { + markSegmentAsNeedRefresh(id); + } else { + log.debug("Skipping refresh for unused segment [%s].", id); + } + } } diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 4435071a6fe2..2141f878f0c5 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -2420,6 +2420,27 @@ public void refresh(Set segmentsToRefresh, Set dataSourcesToR Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId())); Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId())); + + AvailableSegmentMetadata availableSegmentMetadata = + schema.getAvailableSegmentMetadata(dataSource, segments.get(0).getId()); + + Assert.assertNotNull(availableSegmentMetadata); + // fetching metadata for unused segment shouldn't mark it for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId())); + + Set metadatas = new HashSet<>(); + schema.iterateSegmentMetadata().forEachRemaining(metadatas::add); + + Assert.assertEquals( + 1, + metadatas.stream() + .filter( + metadata -> + metadata.getSegment().getId().equals(segments.get(0).getId())).count() + ); + + // iterating over entire metadata doesn't cause unsed segment to be marked for refresh + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(0).getId())); } private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) From 0a314febcc9ce070870aedbe66df7d77ec927583 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 12 Sep 2024 15:11:02 +0530 Subject: [PATCH 8/8] Fix intellij inspection --- .../segment/metadata/CoordinatorSegmentMetadataCacheTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 2141f878f0c5..22b0890e855e 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -2221,7 +2221,7 @@ protected void coldDatasourceSchemaExec() } @Test - public void testTombstoneSegmentIsNotRefreshed() throws IOException, InterruptedException + public void testTombstoneSegmentIsNotRefreshed() throws IOException { String brokerInternalQueryConfigJson = "{\"context\": { \"priority\": 5} }";