From 258b1293c61d6f44134ed8b208c44737baa4d493 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 24 Jan 2024 23:21:21 -0800 Subject: [PATCH 1/3] Merge hydrant runners flatly for realtime queries. Prior to this patch, we have two layers of mergeRunners for realtime queries: one for each Sink (a logical segment) and one across all Sinks. This is done because to keep metrics and results grouped by Sink, given that each FireHydrant within a Sink has its own separate storage adapter. However, it costs extra memory usage due to the extra layer of materialization. This is especially pronounced for groupBy queries, which only use their merge buffers at the top layer of merging. The lower layer of merging materializes ResultRows directly into the heap, which can cause heap exhaustion if there are enough ResultRows. This patch changes to a single layer of merging when bySegment: false, just like Historicals. To accommodate that, segment metrics like query/segment/time are now per-FireHydrant instead of per-Sink. Two layers of merging are retained when bySegment: true. This isn't common, because it's typically only used when segment level caching is enabled on the Broker, which is off by default. --- .../common/task/TestAppenderatorsManager.java | 1 - .../realtime/appenderator/Appenderators.java | 4 - .../DefaultRealtimeAppenderatorFactory.java | 1 - .../PeonAppenderatorsManager.java | 1 - .../appenderator/SinkQuerySegmentWalker.java | 279 +++++++++++------- .../UnifiedIndexerAppenderatorsManager.java | 1 - .../realtime/plumber/FlushingPlumber.java | 1 - .../realtime/plumber/RealtimePlumber.java | 4 - .../plumber/RealtimePlumberSchool.java | 1 - .../StreamAppenderatorTester.java | 3 - 10 files changed, 165 insertions(+), 131 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java index be03fafd3570..f54b0ecd2af2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TestAppenderatorsManager.java @@ -89,7 +89,6 @@ public Appenderator createRealtimeAppenderatorForTask( segmentAnnouncer, emitter, queryProcessingPool, - joinableFactory, cache, cacheConfig, cachePopulatorStats, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java index 47cd058b04d8..51115c48baee 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/Appenderators.java @@ -32,8 +32,6 @@ import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.segment.join.JoinableFactory; -import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; @@ -58,7 +56,6 @@ public static Appenderator createRealtime( DataSegmentAnnouncer segmentAnnouncer, ServiceEmitter emitter, QueryProcessingPool queryProcessingPool, - JoinableFactory joinableFactory, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats, @@ -86,7 +83,6 @@ public static Appenderator createRealtime( emitter, conglomerate, queryProcessingPool, - new JoinableFactoryWrapper(joinableFactory), Preconditions.checkNotNull(cache, "cache"), cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java index 960779fbf162..e64c315484d2 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/DefaultRealtimeAppenderatorFactory.java @@ -115,7 +115,6 @@ public Appenderator build( segmentAnnouncer, emitter, queryProcessingPool, - joinableFactory, cache, cacheConfig, cachePopulatorStats, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java index 070ac62568ad..dba96acc66ad 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/PeonAppenderatorsManager.java @@ -105,7 +105,6 @@ public Appenderator createRealtimeAppenderatorForTask( segmentAnnouncer, emitter, queryProcessingPool, - joinableFactory, cache, cacheConfig, cachePopulatorStats, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 7c81b60feab6..9019d285b74c 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.google.common.collect.Iterables; import org.apache.druid.client.CachingQueryRunner; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; @@ -30,7 +29,6 @@ import org.apache.druid.client.cache.ForegroundCachePopulator; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -53,13 +51,12 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.query.SinkQueryRunners; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; -import org.apache.druid.segment.join.JoinableFactoryWrapper; +import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.segment.realtime.plumber.SinkSegmentReference; @@ -69,13 +66,19 @@ import org.apache.druid.utils.CloseableUtils; import org.joda.time.Interval; -import java.io.Closeable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.stream.Collectors; /** * Query handler for indexing tasks. @@ -92,7 +95,6 @@ public class SinkQuerySegmentWalker implements QuerySegmentWalker private final ServiceEmitter emitter; private final QueryRunnerFactoryConglomerate conglomerate; private final QueryProcessingPool queryProcessingPool; - private final JoinableFactoryWrapper joinableFactoryWrapper; private final Cache cache; private final CacheConfig cacheConfig; private final CachePopulatorStats cachePopulatorStats; @@ -106,7 +108,6 @@ public SinkQuerySegmentWalker( ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, QueryProcessingPool queryProcessingPool, - JoinableFactoryWrapper joinableFactoryWrapper, Cache cache, CacheConfig cacheConfig, CachePopulatorStats cachePopulatorStats @@ -118,7 +119,6 @@ public SinkQuerySegmentWalker( this.emitter = Preconditions.checkNotNull(emitter, "emitter"); this.conglomerate = Preconditions.checkNotNull(conglomerate, "conglomerate"); this.queryProcessingPool = Preconditions.checkNotNull(queryProcessingPool, "queryProcessingPool"); - this.joinableFactoryWrapper = joinableFactoryWrapper; this.cache = Preconditions.checkNotNull(cache, "cache"); this.cacheConfig = Preconditions.checkNotNull(cacheConfig, "cacheConfig"); this.cachePopulatorStats = Preconditions.checkNotNull(cachePopulatorStats, "cachePopulatorStats"); @@ -186,110 +186,163 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final // We compute the join cache key here itself so it doesn't need to be re-computed for every segment final Optional cacheKeyPrefix = Optional.ofNullable(query.getDataSource().getCacheKey()); - Iterable> perSegmentRunners = Iterables.transform( - specs, - newDescriptor -> { - final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor); - final PartitionChunk chunk = sinkTimeline.findChunk( - descriptor.getInterval(), - descriptor.getVersion(), - descriptor.getPartitionNumber() + // We need to report data for each Sink all-or-nothing, which means we need to acquire references for all + // subsegments (FireHydrants) of a segment (Sink) at once. To ensure they are properly released even when a + // query fails or is canceled, we acquire *all* sink reference upfront, and release them all when the main + // QueryRunner returned by this method is closed. (We can't do the acquisition and releasing at the level of + // each FireHydrant's runner, since then it wouldn't be properly all-or-nothing on a per-Sink basis.) + final List allSegmentReferences = new ArrayList<>(); + final Map segmentIdMap = new HashMap<>(); + final LinkedHashMap>> allRunners = new LinkedHashMap<>(); + + try { + for (final SegmentDescriptor newDescriptor : specs) { + final SegmentDescriptor descriptor = newIdToBasePendingSegment.getOrDefault(newDescriptor, newDescriptor); + final PartitionChunk chunk = sinkTimeline.findChunk( + descriptor.getInterval(), + descriptor.getVersion(), + descriptor.getPartitionNumber() + ); + + if (chunk == null) { + allRunners.put( + descriptor, + Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)) + ); + continue; + } + + final Sink theSink = chunk.getObject(); + final SegmentId sinkSegmentId = theSink.getSegment().getId(); + segmentIdMap.put(descriptor, sinkSegmentId); + final List sinkSegmentReferences = + theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment); + + if (sinkSegmentReferences == null) { + // We failed to acquire references for all subsegments. Bail and report the entire sink missing. + allRunners.put( + descriptor, + Collections.singletonList(new ReportTimelineMissingSegmentQueryRunner<>(descriptor)) ); + } else if (sinkSegmentReferences.isEmpty()) { + allRunners.put(descriptor, Collections.singletonList(new NoopQueryRunner<>())); + } else { + allSegmentReferences.addAll(sinkSegmentReferences); - if (chunk == null) { - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } - - final Sink theSink = chunk.getObject(); - final SegmentId sinkSegmentId = theSink.getSegment().getId(); - final List segmentReferences = - theSink.acquireSegmentReferences(segmentMapFn, skipIncrementalSegment); - - if (segmentReferences == null) { - // We failed to acquire references for all subsegments. Bail and report the entire sink missing. - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } else if (segmentReferences.isEmpty()) { - return new NoopQueryRunner<>(); - } - - final Closeable releaser = () -> CloseableUtils.closeAll(segmentReferences); - - try { - Iterable> perHydrantRunners = new SinkQueryRunners<>( - Iterables.transform( - segmentReferences, - segmentReference -> { - QueryRunner runner = factory.createRunner(segmentReference.getSegment()); - - // 1) Only use caching if data is immutable - // 2) Hydrants are not the same between replicas, make sure cache is local - if (segmentReference.isImmutable() && cache.isLocal()) { - StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter(); - long segmentMinTime = storageAdapter.getMinTime().getMillis(); - long segmentMaxTime = storageAdapter.getMaxTime().getMillis(); - Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1); - runner = new CachingQueryRunner<>( - makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()), - cacheKeyPrefix, - descriptor, - actualDataInterval, - objectMapper, - cache, - toolChest, - runner, - // Always populate in foreground regardless of config - new ForegroundCachePopulator( - objectMapper, - cachePopulatorStats, - cacheConfig.getMaxEntrySize() - ), - cacheConfig - ); - } - return new Pair<>(segmentReference.getSegment().getDataInterval(), runner); + allRunners.put( + descriptor, + sinkSegmentReferences.stream().map( + segmentReference -> { + QueryRunner runner = new MetricsEmittingQueryRunner<>( + emitter, + factory.getToolchest(), + factory.createRunner(segmentReference.getSegment()), + QueryMetrics::reportSegmentTime, + queryMetrics -> queryMetrics.segment(sinkSegmentId.toString()) + ); + + // 1) Only use caching if data is immutable + // 2) Hydrants are not the same between replicas, make sure cache is local + if (segmentReference.isImmutable() && cache.isLocal()) { + StorageAdapter storageAdapter = segmentReference.getSegment().asStorageAdapter(); + long segmentMinTime = storageAdapter.getMinTime().getMillis(); + long segmentMaxTime = storageAdapter.getMaxTime().getMillis(); + Interval actualDataInterval = Intervals.utc(segmentMinTime, segmentMaxTime + 1); + runner = new CachingQueryRunner<>( + makeHydrantCacheIdentifier(sinkSegmentId, segmentReference.getHydrantNumber()), + cacheKeyPrefix, + descriptor, + actualDataInterval, + objectMapper, + cache, + toolChest, + runner, + // Always populate in foreground regardless of config + new ForegroundCachePopulator( + objectMapper, + cachePopulatorStats, + cacheConfig.getMaxEntrySize() + ), + cacheConfig + ); } - ) - ); - return QueryRunnerHelper.makeClosingQueryRunner( - new SpecificSegmentQueryRunner<>( - withPerSinkMetrics( - new BySegmentQueryRunner<>( - sinkSegmentId, - descriptor.getInterval().getStart(), - factory.mergeRunners( - DirectQueryProcessingPool.INSTANCE, - perHydrantRunners - ) - ), + + // Regardless of whether caching is enabled, do reportSegmentAndCacheTime outside the + // *possible* caching. + runner = new MetricsEmittingQueryRunner<>( + emitter, + factory.getToolchest(), + runner, + QueryMetrics::reportSegmentAndCacheTime, + queryMetrics -> queryMetrics.segment(sinkSegmentId.toString()) + ).withWaitMeasuredFromNow(); + + // Emit CPU time metrics. + runner = CPUTimeMetricQueryRunner.safeBuild( + runner, toolChest, - sinkSegmentId, - cpuTimeAccumulator - ), - new SpecificSegmentSpec(descriptor) - ), - releaser - ); - } - catch (Throwable e) { - throw CloseableUtils.closeAndWrapInCatch(e, releaser); - } + emitter, + cpuTimeAccumulator, + false + ); + + // Run with specific segment descriptor. + runner = new SpecificSegmentQueryRunner<>( + runner, + new SpecificSegmentSpec(descriptor) + ); + + return runner; + } + ).collect(Collectors.toList()) + ); } - ); - final QueryRunner mergedRunner = - toolChest.mergeResults( - factory.mergeRunners( - queryProcessingPool, - perSegmentRunners - ) + } + + final QueryRunner mergedRunner; + + if (query.context().isBySegment()) { + // bySegment: merge all hydrants for a Sink first, then merge Sinks. Necessary to keep results for the + // same segment together, but causes additional memory usage due to the extra layer of materialization, + // so we only do this if we need to. + mergedRunner = factory.mergeRunners( + queryProcessingPool, + allRunners.entrySet().stream().map( + entry -> new BySegmentQueryRunner<>( + segmentIdMap.get(entry.getKey()), + entry.getKey().getInterval().getStart(), + factory.mergeRunners( + DirectQueryProcessingPool.INSTANCE, + entry.getValue() + ) + ) + ).collect(Collectors.toList()) + ); + } else { + // Not bySegment: merge all hydrants at the same level, rather than grouped by Sink (segment). + mergedRunner = factory.mergeRunners( + queryProcessingPool, + allRunners.values().stream().flatMap(Collection::stream).collect(Collectors.toList()) ); + } - return CPUTimeMetricQueryRunner.safeBuild( - new FinalizeResultsQueryRunner<>(mergedRunner, toolChest), - toolChest, - emitter, - cpuTimeAccumulator, - true - ); + // 1) Merge results using the toolChest, finalize if necessary. + // 2) Measure CPU time of that operation. + // 3) Release all sink segment references. + return QueryRunnerHelper.makeClosingQueryRunner( + CPUTimeMetricQueryRunner.safeBuild( + new FinalizeResultsQueryRunner<>(toolChest.mergeResults(mergedRunner), toolChest), + toolChest, + emitter, + cpuTimeAccumulator, + true + ), + () -> CloseableUtils.closeAll(allSegmentReferences) + ); + } + catch (Throwable e) { + throw CloseableUtils.closeAndWrapInCatch(e, () -> CloseableUtils.closeAll(allSegmentReferences)); + } } public void registerNewVersionOfPendingSegment( @@ -310,20 +363,18 @@ String getDataSource() } /** - * Decorates a Sink's query runner to emit query/segmentAndCache/time, query/segment/time, query/wait/time once - * each for the whole Sink. Also adds CPU time to cpuTimeAccumulator. + * Decorates a {@link FireHydrant} query runner to emit query/segmentAndCache/time, query/segment/time, and + * query/wait/time. Also adds CPU time to cpuTimeAccumulator. Because each hydrant (persist file or in-memory + * {@link IncrementalIndex}) is submitted to the executor separately, it reports its own metrics, even though + * it isn't a full segment. */ - private QueryRunner withPerSinkMetrics( - final QueryRunner sinkRunner, + private QueryRunner withHydrantMetrics( + final QueryRunner hydrantRunner, final QueryToolChest> queryToolChest, final SegmentId sinkSegmentId, final AtomicLong cpuTimeAccumulator ) { - // Note: reportSegmentAndCacheTime and reportSegmentTime are effectively the same here. They don't split apart - // cache vs. non-cache due to the fact that Sinks may be partially cached and partially uncached. Making this - // better would need to involve another accumulator like the cpuTimeAccumulator that we could share with the - // sinkRunner. String sinkSegmentIdString = sinkSegmentId.toString(); return CPUTimeMetricQueryRunner.safeBuild( new MetricsEmittingQueryRunner<>( @@ -332,7 +383,7 @@ private QueryRunner withPerSinkMetrics( new MetricsEmittingQueryRunner<>( emitter, queryToolChest, - sinkRunner, + hydrantRunner, QueryMetrics::reportSegmentTime, queryMetrics -> queryMetrics.segment(sinkSegmentIdString) ), diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index bd829ccfa157..8b5b14823573 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -439,7 +439,6 @@ public DatasourceBundle( serviceEmitter, queryRunnerFactoryConglomerateProvider.get(), queryProcessingPool, - joinableFactoryWrapper, Preconditions.checkNotNull(cache, "cache"), cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java index 2194ff704010..a271c4540c50 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/FlushingPlumber.java @@ -91,7 +91,6 @@ public FlushingPlumber( conglomerate, segmentAnnouncer, queryProcessingPool, - joinableFactory, null, null, null, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index efabdcc16857..0380abf9f223 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -64,8 +64,6 @@ import org.apache.druid.segment.incremental.IndexSizeExceededException; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; -import org.apache.druid.segment.join.JoinableFactory; -import org.apache.druid.segment.join.JoinableFactoryWrapper; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.realtime.FireDepartmentMetrics; import org.apache.druid.segment.realtime.FireHydrant; @@ -142,7 +140,6 @@ public RealtimePlumber( QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, QueryProcessingPool queryProcessingPool, - JoinableFactory joinableFactory, DataSegmentPusher dataSegmentPusher, SegmentPublisher segmentPublisher, SegmentHandoffNotifier handoffNotifier, @@ -172,7 +169,6 @@ public RealtimePlumber( emitter, conglomerate, queryProcessingPool, - new JoinableFactoryWrapper(joinableFactory), cache, cacheConfig, cachePopulatorStats diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java index e2ba02cbc0e1..8b19153a9dea 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -112,7 +112,6 @@ public Plumber findPlumber( conglomerate, segmentAnnouncer, queryProcessingPool, - joinableFactory, dataSegmentPusher, segmentPublisher, handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()), diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index bcf2f8a22161..e81fd9795d83 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -61,7 +61,6 @@ import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.RealtimeTuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; -import org.apache.druid.segment.join.NoopJoinableFactory; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.SegmentLoaderConfig; import org.apache.druid.segment.metadata.CentralizedDatasourceSchemaConfig; @@ -247,7 +246,6 @@ ScanQuery.class, new ScanQueryRunnerFactory( announcer, emitter, new ForwardingQueryProcessingPool(queryExecutor), - NoopJoinableFactory.INSTANCE, MapCache.create(2048), new CacheConfig(), new CachePopulatorStats(), @@ -295,7 +293,6 @@ ScanQuery.class, new ScanQueryRunnerFactory( new NoopDataSegmentAnnouncer(), emitter, new ForwardingQueryProcessingPool(queryExecutor), - NoopJoinableFactory.INSTANCE, MapCache.create(2048), new CacheConfig(), new CachePopulatorStats(), From 931464798e5aa85c31a41216587cf9277cff900c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 24 Jan 2024 23:58:50 -0800 Subject: [PATCH 2/3] Use SinkQueryRunners. --- .../appenderator/SinkQuerySegmentWalker.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 9019d285b74c..b41e494345cd 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -29,6 +29,7 @@ import org.apache.druid.client.cache.ForegroundCachePopulator; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.FunctionalIterable; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -51,6 +52,7 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.SinkQueryRunners; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.spec.SpecificSegmentQueryRunner; import org.apache.druid.query.spec.SpecificSegmentSpec; @@ -67,7 +69,6 @@ import org.joda.time.Interval; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -322,7 +323,14 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final // Not bySegment: merge all hydrants at the same level, rather than grouped by Sink (segment). mergedRunner = factory.mergeRunners( queryProcessingPool, - allRunners.values().stream().flatMap(Collection::stream).collect(Collectors.toList()) + new SinkQueryRunners<>( + allRunners.entrySet().stream().flatMap( + entry -> + entry.getValue().stream().map( + runner -> + Pair.of(entry.getKey().getInterval(), runner) + ) + ).collect(Collectors.toList())) ); } From 4d9452cd85c400c4f16bcc96c3e5f8fbf4a0a1dd Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 25 Jan 2024 00:41:51 -0800 Subject: [PATCH 3/3] Remove unused method. --- .../appenderator/SinkQuerySegmentWalker.java | 36 ------------------- 1 file changed, 36 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index b41e494345cd..1bf07fa41463 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -58,7 +58,6 @@ import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.StorageAdapter; -import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.realtime.FireHydrant; import org.apache.druid.segment.realtime.plumber.Sink; import org.apache.druid.segment.realtime.plumber.SinkSegmentReference; @@ -370,41 +369,6 @@ String getDataSource() return dataSource; } - /** - * Decorates a {@link FireHydrant} query runner to emit query/segmentAndCache/time, query/segment/time, and - * query/wait/time. Also adds CPU time to cpuTimeAccumulator. Because each hydrant (persist file or in-memory - * {@link IncrementalIndex}) is submitted to the executor separately, it reports its own metrics, even though - * it isn't a full segment. - */ - private QueryRunner withHydrantMetrics( - final QueryRunner hydrantRunner, - final QueryToolChest> queryToolChest, - final SegmentId sinkSegmentId, - final AtomicLong cpuTimeAccumulator - ) - { - String sinkSegmentIdString = sinkSegmentId.toString(); - return CPUTimeMetricQueryRunner.safeBuild( - new MetricsEmittingQueryRunner<>( - emitter, - queryToolChest, - new MetricsEmittingQueryRunner<>( - emitter, - queryToolChest, - hydrantRunner, - QueryMetrics::reportSegmentTime, - queryMetrics -> queryMetrics.segment(sinkSegmentIdString) - ), - QueryMetrics::reportSegmentAndCacheTime, - queryMetrics -> queryMetrics.segment(sinkSegmentIdString) - ).withWaitMeasuredFromNow(), - queryToolChest, - emitter, - cpuTimeAccumulator, - false - ); - } - public VersionedIntervalTimeline getSinkTimeline() { return sinkTimeline;