From 1c1dd7f72731a3d76bf3ff526bb064e840fb0f4b Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 19 Mar 2025 20:10:09 -0700 Subject: [PATCH 1/6] Fix resource leak for GroupBy query merge buffer when match result cache --- .../druid/query/ResultLevelCachingQueryRunner.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 8cc6348a7c8a..271bf6da570d 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -98,16 +98,17 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) query = query.withOverriddenContext( ImmutableMap.of(QueryResource.HEADER_IF_NONE_MATCH, existingResultSetId)); - Sequence resultFromClient = baseRunner.run( - QueryPlus.wrap(query), - responseContext - ); String newResultSetId = responseContext.getEntityTag(); if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) { log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId()); return deserializeResults(cachedResultSet, strategy, existingResultSetId); } else { + Sequence resultFromClient = baseRunner.run( + QueryPlus.wrap(query), + responseContext + ); + @Nullable ResultLevelCachePopulator resultLevelCachePopulator = createResultLevelCachePopulator( cacheKey, From 13275769d4654450217504d72c863f2ac4013013 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 19 Mar 2025 21:20:38 -0700 Subject: [PATCH 2/6] Fix resource leak for GroupBy query merge buffer when match result cache --- .../druid/query/ResultLevelCachingQueryRunner.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 271bf6da570d..437d55fad5b2 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -98,17 +98,17 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) query = query.withOverriddenContext( ImmutableMap.of(QueryResource.HEADER_IF_NONE_MATCH, existingResultSetId)); + Sequence resultFromClient = baseRunner.run( + QueryPlus.wrap(query), + responseContext + ); String newResultSetId = responseContext.getEntityTag(); if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) { log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId()); + resultFromClient.accumulate(null, (accumulated, in) -> accumulated); return deserializeResults(cachedResultSet, strategy, existingResultSetId); } else { - Sequence resultFromClient = baseRunner.run( - QueryPlus.wrap(query), - responseContext - ); - @Nullable ResultLevelCachePopulator resultLevelCachePopulator = createResultLevelCachePopulator( cacheKey, From b38ac1c0a81b711e1776da67c8992a32f3ce7c93 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 20 Mar 2025 00:48:28 -0700 Subject: [PATCH 3/6] Add test --- .../ResultLevelCachingQueryRunnerTest.java | 99 ++++++++++++++++++- 1 file changed, 98 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java b/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java index 3cb4ae528e67..006a17d2bc40 100644 --- a/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java +++ b/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java @@ -19,22 +19,54 @@ package org.apache.druid.query; +import com.google.common.collect.ImmutableList; import org.apache.druid.client.SimpleServerView; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.client.cache.MapCache; +import org.apache.druid.collections.BlockingPool; +import org.apache.druid.collections.DefaultBlockingPool; +import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.PeriodGranularity; +import org.apache.druid.java.util.common.guava.MergeSequence; import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.dimension.DefaultDimensionSpec; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByQueryRunnerTest; +import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.IncrementalIndexSegment; +import org.apache.druid.segment.TestIndex; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.UUID; + +import static org.apache.druid.query.QueryRunnerTestHelper.SEGMENT_ID; +import static org.junit.Assert.fail; public class ResultLevelCachingQueryRunnerTest extends QueryRunnerBasedOnClusteredClientTestBase { @@ -252,7 +284,7 @@ public void testPopulateCacheWhenQueryThrowExceptionShouldNotCache() ); try { sequence.toList(); - Assert.fail("Expected to throw an exception"); + fail("Expected to throw an exception"); } catch (RuntimeException e) { Assert.assertEquals("Exception for testing", e.getMessage()); @@ -264,6 +296,71 @@ public void testPopulateCacheWhenQueryThrowExceptionShouldNotCache() } } + @Test + public void testUseCacheAndReleaseResourceFromClient() + { + final BlockingPool mergePool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1), 1); + prepareCluster(10); + final Query> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval()); + CacheConfig cacheConfig = newCacheConfig(true, true, DEFAULT_CACHE_ENTRY_MAX_SIZE); + final QueryRunner> baseRunner = cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals()); + RetryQueryRunner> spyRunner = Mockito.spy(new RetryQueryRunner<>( + baseRunner, + cachingClusteredClient::getQueryRunnerForSegments, + new RetryQueryRunnerConfig(), + objectMapper + )); + Mockito.doAnswer((Answer) invocation -> { + List> resoruce = mergePool.takeBatch(1, 1); + if (resoruce.isEmpty()) { + fail("Resource should not be empty"); + } + Sequence> realSequence = (Sequence>) invocation.callRealMethod(); + Closer closer = Closer.create(); + closer.register(() -> resoruce.forEach(ReferenceCountingResourceHolder::close)); + return Sequences.withBaggage(realSequence, closer); + }).when(spyRunner).run(ArgumentMatchers.any(), ArgumentMatchers.any()); + + final ResultLevelCachingQueryRunner> queryRunner1 = new ResultLevelCachingQueryRunner<>( + spyRunner, + conglomerate.getToolChest(query), + query, + objectMapper, + cache, + cacheConfig + ); + + final Sequence> sequence1 = queryRunner1.run( + QueryPlus.wrap(query), + responseContext() + ); + final List> results1 = sequence1.toList(); + Assert.assertEquals(0, cache.getStats().getNumHits()); + Assert.assertEquals(1, cache.getStats().getNumEntries()); + Assert.assertEquals(1, cache.getStats().getNumMisses()); + + + final Sequence> sequence2 = queryRunner1.run( + QueryPlus.wrap(query), + responseContext() + ); + final List> results2 = sequence2.toList(); + Assert.assertEquals(results1, results2); + Assert.assertEquals(1, cache.getStats().getNumHits()); + Assert.assertEquals(1, cache.getStats().getNumEntries()); + Assert.assertEquals(1, cache.getStats().getNumMisses()); + + final Sequence> sequence3 = queryRunner1.run( + QueryPlus.wrap(query), + responseContext() + ); + final List> results3 = sequence2.toList(); + Assert.assertEquals(results1, results3); + Assert.assertEquals(2, cache.getStats().getNumHits()); + Assert.assertEquals(1, cache.getStats().getNumEntries()); + Assert.assertEquals(1, cache.getStats().getNumMisses()); + } + private ResultLevelCachingQueryRunner createQueryRunner( CacheConfig cacheConfig, Query query From 43f666bb2e591d0114bcd559b9da9e4b895d8280 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 20 Mar 2025 00:48:57 -0700 Subject: [PATCH 4/6] Add test --- .../ResultLevelCachingQueryRunnerTest.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java b/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java index 006a17d2bc40..5176753438e4 100644 --- a/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java +++ b/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java @@ -19,7 +19,6 @@ package org.apache.druid.query; -import com.google.common.collect.ImmutableList; import org.apache.druid.client.SimpleServerView; import org.apache.druid.client.cache.Cache; import org.apache.druid.client.cache.CacheConfig; @@ -28,44 +27,24 @@ import org.apache.druid.collections.DefaultBlockingPool; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.concurrent.Execs; -import org.apache.druid.java.util.common.granularity.PeriodGranularity; -import org.apache.druid.java.util.common.guava.MergeSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.io.Closer; -import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.context.ResponseContext; -import org.apache.druid.query.dimension.DefaultDimensionSpec; -import org.apache.druid.query.groupby.GroupByQuery; -import org.apache.druid.query.groupby.GroupByQueryConfig; -import org.apache.druid.query.groupby.GroupByQueryRunnerTest; -import org.apache.druid.query.groupby.GroupByQueryRunnerTestHelper; -import org.apache.druid.query.groupby.ResultRow; -import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.timeseries.TimeseriesResultValue; -import org.apache.druid.segment.IncrementalIndexSegment; -import org.apache.druid.segment.TestIndex; import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; -import org.joda.time.Period; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; import java.util.List; -import java.util.UUID; -import static org.apache.druid.query.QueryRunnerTestHelper.SEGMENT_ID; import static org.junit.Assert.fail; public class ResultLevelCachingQueryRunnerTest extends QueryRunnerBasedOnClusteredClientTestBase From 7a2523b77d9690c7366d3b06a13157f45f843bb1 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 20 Mar 2025 00:51:38 -0700 Subject: [PATCH 5/6] Add comment --- .../org/apache/druid/query/ResultLevelCachingQueryRunner.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java index 437d55fad5b2..7e32e098a13e 100644 --- a/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java +++ b/server/src/main/java/org/apache/druid/query/ResultLevelCachingQueryRunner.java @@ -106,6 +106,7 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) { log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId()); + // Call accumulate on the sequence to ensure that all Wrapper/Closer/Baggage/etc. get called resultFromClient.accumulate(null, (accumulated, in) -> accumulated); return deserializeResults(cachedResultSet, strategy, existingResultSetId); } else { From f480d515f00b094dfe918d7ce02a05eacf7d08a3 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Thu, 20 Mar 2025 01:43:40 -0700 Subject: [PATCH 6/6] Add test --- .../apache/druid/query/ResultLevelCachingQueryRunnerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java b/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java index 5176753438e4..a0c14239e494 100644 --- a/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java +++ b/server/src/test/java/org/apache/druid/query/ResultLevelCachingQueryRunnerTest.java @@ -333,7 +333,7 @@ public void testUseCacheAndReleaseResourceFromClient() QueryPlus.wrap(query), responseContext() ); - final List> results3 = sequence2.toList(); + final List> results3 = sequence3.toList(); Assert.assertEquals(results1, results3); Assert.assertEquals(2, cache.getStats().getNumHits()); Assert.assertEquals(1, cache.getStats().getNumEntries());