diff --git a/benchmarks/src/main/java/io/druid/benchmark/StupidPoolConcurrencyBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/StupidPoolConcurrencyBenchmark.java index fb0e1b33982e..43ae737495cd 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/StupidPoolConcurrencyBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/StupidPoolConcurrencyBenchmark.java @@ -65,6 +65,7 @@ public static class BenchmarkPool { private final AtomicLong numPools = new AtomicLong(0L); private final StupidPool pool = new StupidPool<>( + "simpleObject pool", new Supplier() { @Override diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java index c50b01ea3653..f59922400c94 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/GroupByBenchmark.java @@ -312,6 +312,7 @@ public void setup() throws IOException } StupidPool bufferPool = new StupidPool<>( + "GroupByBenchmark-computeBufferPool", new OffheapBufferGenerator("compute", 250_000_000), 0, Integer.MAX_VALUE diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java index a09b899eece9..2725d001a9bc 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/TopNBenchmark.java @@ -272,7 +272,12 @@ public void setup() throws IOException } factory = new TopNQueryRunnerFactory( - new StupidPool<>(new OffheapBufferGenerator("compute", 250000000), 0, Integer.MAX_VALUE), + new StupidPool<>( + "TopNBenchmark-compute-bufferPool", + new OffheapBufferGenerator("compute", 250000000), + 0, + Integer.MAX_VALUE + ), new TopNQueryQueryToolChest(new TopNQueryConfig(), QueryBenchmarkUtil.NoopIntervalChunkingQueryRunnerDecorator()), QueryBenchmarkUtil.NOOP_QUERYWATCHER ); diff --git a/common/src/main/java/io/druid/collections/StupidPool.java b/common/src/main/java/io/druid/collections/StupidPool.java index f6cb2cfd8781..939377a3eec1 100644 --- a/common/src/main/java/io/druid/collections/StupidPool.java +++ b/common/src/main/java/io/druid/collections/StupidPool.java @@ -19,14 +19,19 @@ package io.druid.collections; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; +import sun.misc.Cleaner; +import java.lang.ref.WeakReference; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** */ @@ -34,25 +39,35 @@ public class StupidPool { private static final Logger log = new Logger(StupidPool.class); + private final String name; private final Supplier generator; - private final Queue objects = new ConcurrentLinkedQueue<>(); + /** + * StupidPool Implementation Note + * It is assumed that StupidPools are never reclaimed by the GC, either stored in static fields or global singleton + * injector like Guice. Otherwise false positive "Not closed! Object leaked from..." could be reported. To avoid + * this, StupidPool should be made closeable (or implement {@link io.druid.java.util.common.lifecycle.LifecycleStop} + * and registered in the global lifecycle), in this close() method all {@link ObjectResourceHolder}s should be drained + * from the {@code objects} queue, and notifier.disable() called for them. + */ + private final Queue objects = new ConcurrentLinkedQueue<>(); + /** + * {@link ConcurrentLinkedQueue}'s size() is O(n) queue traversal apparently for the sake of being 100% + * wait-free, that is not required by {@code StupidPool}. In {@code poolSize} we account the queue size + * ourselves, to avoid traversal of {@link #objects} in {@link #tryReturnToPool}. + */ + private final AtomicLong poolSize = new AtomicLong(0); + private final AtomicLong leakedObjectsCounter = new AtomicLong(0); //note that this is just the max entries in the cache, pool can still create as many buffers as needed. private final int objectsCacheMaxCount; - public StupidPool( - Supplier generator - ) + public StupidPool(String name, Supplier generator) { - this(generator, 0, Integer.MAX_VALUE); + this(name, generator, 0, Integer.MAX_VALUE); } - public StupidPool( - Supplier generator, - int initCount, - int objectsCacheMaxCount - ) + public StupidPool(String name, Supplier generator, int initCount, int objectsCacheMaxCount) { Preconditions.checkArgument( initCount <= objectsCacheMaxCount, @@ -60,28 +75,117 @@ public StupidPool( initCount, objectsCacheMaxCount ); + this.name = name; this.generator = generator; this.objectsCacheMaxCount = objectsCacheMaxCount; for (int i = 0; i < initCount; i++) { - objects.add(generator.get()); + objects.add(makeObjectWithHandler()); + poolSize.incrementAndGet(); } } + @Override + public String toString() + { + return "StupidPool{" + + "name=" + name + + ", objectsCacheMaxCount=" + objectsCacheMaxCount + + ", poolSize=" + poolSize() + + "}"; + } + public ResourceHolder take() { - final T obj = objects.poll(); - return obj == null ? new ObjectResourceHolder(generator.get()) : new ObjectResourceHolder(obj); + ObjectResourceHolder resourceHolder = objects.poll(); + if (resourceHolder == null) { + return makeObjectWithHandler(); + } else { + poolSize.decrementAndGet(); + return resourceHolder; + } + } + + private ObjectResourceHolder makeObjectWithHandler() + { + T object = generator.get(); + ObjectId objectId = new ObjectId(); + ObjectLeakNotifier notifier = new ObjectLeakNotifier(this); + // Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken + // from the pool, and the ResourceHolder is not closed, Cleaner won't notify about the leak. + return new ObjectResourceHolder(object, objectId, Cleaner.create(objectId, notifier), notifier); + } + + @VisibleForTesting + long poolSize() { + return poolSize.get(); + } + + @VisibleForTesting + long leakedObjectsCount() + { + return leakedObjectsCounter.get(); + } + + private void tryReturnToPool(T object, ObjectId objectId, Cleaner cleaner, ObjectLeakNotifier notifier) + { + long currentPoolSize; + do { + currentPoolSize = poolSize.get(); + if (currentPoolSize >= objectsCacheMaxCount) { + notifier.disable(); + // Effectively does nothing, because notifier is disabled above. The purpose of this call is to deregister the + // cleaner from the internal global linked list of all cleaners in the JVM, and let it be reclaimed itself. + cleaner.clean(); + // Important to use the objectId after notifier.disable() (in the logging statement below), otherwise VM may + // already decide that the objectId is unreachable and run Cleaner before notifier.disable(), that would be + // reported as a false-positive "leak". Ideally reachabilityFence(objectId) should be inserted here. + log.debug("cache num entries is exceeding in [%s], objectId [%s]", this, objectId); + return; + } + } while (!poolSize.compareAndSet(currentPoolSize, currentPoolSize + 1)); + if (!objects.offer(new ObjectResourceHolder(object, objectId, cleaner, notifier))) { + impossibleOffsetFailed(object, objectId, cleaner, notifier); + } + } + + /** + * This should be impossible, because {@link ConcurrentLinkedQueue#offer(Object)} event don't have `return false;` in + * it's body in OpenJDK 8. + */ + private void impossibleOffsetFailed(T object, ObjectId objectId, Cleaner cleaner, ObjectLeakNotifier notifier) + { + poolSize.decrementAndGet(); + notifier.disable(); + // Effectively does nothing, because notifier is disabled above. The purpose of this call is to deregister the + // cleaner from the internal global linked list of all cleaners in the JVM, and let it be reclaimed itself. + cleaner.clean(); + log.error( + new ISE("Queue offer failed"), + "Could not offer object [%s] back into the queue in [%s], objectId [%s]", + object, + objectId + ); } private class ObjectResourceHolder implements ResourceHolder { - private AtomicBoolean closed = new AtomicBoolean(false); - private final T object; + private final AtomicReference objectRef; + private ObjectId objectId; + private Cleaner cleaner; + private ObjectLeakNotifier notifier; - public ObjectResourceHolder(final T object) + ObjectResourceHolder( + final T object, + final ObjectId objectId, + final Cleaner cleaner, + final ObjectLeakNotifier notifier + ) { - this.object = object; + this.objectRef = new AtomicReference<>(object); + this.objectId = objectId; + this.cleaner = cleaner; + this.notifier = notifier; } // WARNING: it is entirely possible for a caller to hold onto the object and call ObjectResourceHolder.close, @@ -89,7 +193,8 @@ public ObjectResourceHolder(final T object) @Override public T get() { - if (closed.get()) { + final T object = objectRef.get(); + if (object == null) { throw new ISE("Already Closed!"); } @@ -99,30 +204,69 @@ public T get() @Override public void close() { - if (!closed.compareAndSet(false, true)) { - log.warn(new ISE("Already Closed!"), "Already closed"); - return; - } - if (objects.size() < objectsCacheMaxCount) { - if (!objects.offer(object)) { - log.warn(new ISE("Queue offer failed"), "Could not offer object [%s] back into the queue", object); + final T object = objectRef.get(); + if (object != null && objectRef.compareAndSet(object, null)) { + try { + tryReturnToPool(object, objectId, cleaner, notifier); + } + finally { + // Need to null reference to objectId because if ObjectResourceHolder is closed, but leaked, this reference + // will prevent reporting leaks of ResourceHandlers when this object and objectId are taken from the pool + // again. + objectId = null; + // Nulling cleaner and notifier is not strictly needed, but harmless for sure. + cleaner = null; + notifier = null; } - } else { - log.debug("cache num entries is exceeding max limit [%s]", objectsCacheMaxCount); } } + } + + private static class ObjectLeakNotifier implements Runnable + { + /** + * Don't reference {@link StupidPool} directly to prevent it's leak through the internal global chain of Cleaners. + */ + final WeakReference> poolReference; + final AtomicLong leakedObjectsCounter; + final AtomicBoolean disabled = new AtomicBoolean(false); + + ObjectLeakNotifier(StupidPool pool) + { + poolReference = new WeakReference>(pool); + leakedObjectsCounter = pool.leakedObjectsCounter; + } @Override - protected void finalize() throws Throwable + public void run() { try { - if (!closed.get()) { - log.warn("Not closed! Object was[%s]. Allowing gc to prevent leak.", object); + if (!disabled.getAndSet(true)) { + leakedObjectsCounter.incrementAndGet(); + log.warn("Not closed! Object leaked from %s. Allowing gc to prevent leak.", poolReference.get()); } } - finally { - super.finalize(); + // Exceptions must not be thrown in Cleaner.clean(), which calls this ObjectReclaimer.run() method + catch (Exception e) { + try { + log.error(e, "Exception in ObjectLeakNotifier.run()"); + } + catch (Exception ignore) { + // ignore + } } } + + public void disable() + { + disabled.set(true); + } + } + + /** + * Plays the role of the reference for Cleaner, see comment in {@link #makeObjectWithHandler} + */ + private static class ObjectId + { } } diff --git a/common/src/test/java/io/druid/collections/StupidPoolTest.java b/common/src/test/java/io/druid/collections/StupidPoolTest.java index 686635e4e200..7d3eedd08460 100644 --- a/common/src/test/java/io/druid/collections/StupidPoolTest.java +++ b/common/src/test/java/io/druid/collections/StupidPoolTest.java @@ -20,9 +20,7 @@ package io.druid.collections; import com.google.common.base.Supplier; - import io.druid.java.util.common.ISE; - import org.easymock.EasyMock; import org.hamcrest.core.IsInstanceOf; import org.junit.After; @@ -45,7 +43,7 @@ public void setUp() generator = EasyMock.createMock(Supplier.class); EasyMock.expect(generator.get()).andReturn(defaultString).anyTimes(); EasyMock.replay(generator); - poolOfString = new StupidPool<>(generator); + poolOfString = new StupidPool<>("poolOfString", generator); resourceHolderObj = poolOfString.take(); } @@ -72,10 +70,27 @@ public void testExceptionInResourceHolderGet() throws IOException resourceHolderObj.get(); } - @Test - public void testFinalizeInResourceHolder() + @Test(timeout = 60_000) + public void testResourceHandlerClearedByJVM() throws InterruptedException + { + if (System.getProperty("java.version").startsWith("1.7")) { + // This test is unreliable on Java 7, probably GC is not triggered by System.gc(). It is not a problem because + // this test should ever pass on any version of Java to prove that StupidPool doesn't introduce leaks itself and + // actually cleans the leaked objects. + return; + } + String leakedString = createDanglingObjectHandler(); + // Wait until dangling object string is returned to the pool + for (int i = 0; i < 6000 && poolOfString.leakedObjectsCount() == 0; i++) { + System.gc(); + byte[] garbage = new byte[10_000_000]; + Thread.sleep(10); + } + Assert.assertEquals(leakedString, 1, poolOfString.leakedObjectsCount()); + } + + private String createDanglingObjectHandler() { - resourceHolderObj = null; - System.runFinalization(); + return poolOfString.take().get(); } } diff --git a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java index 4aec53e1ac03..93dca9e094a8 100644 --- a/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java +++ b/extensions-contrib/distinctcount/src/test/java/io/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java @@ -55,6 +55,7 @@ public void testTopNWithDistinctCountAgg() throws Exception { TopNQueryEngine engine = new TopNQueryEngine( new StupidPool( + "TopNQueryEngine-bufferPool", new Supplier() { @Override diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java index 52170e75b1bb..16ebda20b674 100644 --- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java +++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/ApproximateHistogramTopNQueryTest.java @@ -69,6 +69,7 @@ public static Iterable constructorFeeder() throws IOException QueryRunnerTestHelper.makeQueryRunners( new TopNQueryRunnerFactory( new StupidPool( + "TopNQueryRunnerFactory-bufferPool", new Supplier() { @Override diff --git a/processing/src/main/java/io/druid/segment/CompressedPools.java b/processing/src/main/java/io/druid/segment/CompressedPools.java index 1d7c07bbdae8..c2453b920aaf 100644 --- a/processing/src/main/java/io/druid/segment/CompressedPools.java +++ b/processing/src/main/java/io/druid/segment/CompressedPools.java @@ -37,6 +37,7 @@ public class CompressedPools public static final int BUFFER_SIZE = 0x10000; private static final StupidPool bufferRecyclerPool = new StupidPool( + "bufferRecyclerPool", new Supplier() { private final AtomicLong counter = new AtomicLong(0); @@ -56,6 +57,7 @@ public static ResourceHolder getBufferRecycler() } private static final StupidPool outputBytesPool = new StupidPool( + "outputBytesPool", new Supplier() { private final AtomicLong counter = new AtomicLong(0); @@ -75,6 +77,7 @@ public static ResourceHolder getOutputBytes() } private static final StupidPool bigEndByteBufPool = new StupidPool( + "bigEndByteBufPool", new Supplier() { private final AtomicLong counter = new AtomicLong(0); @@ -89,6 +92,7 @@ public ByteBuffer get() ); private static final StupidPool littleEndByteBufPool = new StupidPool( + "littleEndByteBufPool", new Supplier() { private final AtomicLong counter = new AtomicLong(0); diff --git a/processing/src/test/java/io/druid/query/TestQueryRunners.java b/processing/src/test/java/io/druid/query/TestQueryRunners.java index 382a9fd4b39e..a8b92e9cbc08 100644 --- a/processing/src/test/java/io/druid/query/TestQueryRunners.java +++ b/processing/src/test/java/io/druid/query/TestQueryRunners.java @@ -40,6 +40,7 @@ public class TestQueryRunners { public static final StupidPool pool = new StupidPool( + "TestQueryRunners-bufferPool", new Supplier() { @Override diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index a8ef7b454085..fd1dbf351069 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -256,6 +256,7 @@ public static final AggregationTestHelper createTopNQueryAggregationTestHelper( TopNQueryRunnerFactory factory = new TopNQueryRunnerFactory( new StupidPool<>( + "TopNQueryRunnerFactory-bufferPool", new Supplier() { @Override diff --git a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java index 9949041c27f8..16c2803d0e18 100644 --- a/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/groupby/GroupByQueryRunnerTest.java @@ -60,10 +60,10 @@ import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; import io.druid.query.aggregation.JavaScriptAggregatorFactory; -import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.query.aggregation.PostAggregator; import io.druid.query.aggregation.cardinality.CardinalityAggregatorFactory; +import io.druid.query.aggregation.first.LongFirstAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniqueFinalizingPostAggregator; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import io.druid.query.aggregation.last.LongLastAggregatorFactory; @@ -262,6 +262,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( { final Supplier configSupplier = Suppliers.ofInstance(config); final StupidPool bufferPool = new StupidPool<>( + "GroupByQueryEngine-bufferPool", new Supplier() { @Override diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java index 6a9391e8135a..41debd5a5a35 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerBenchmark.java @@ -88,6 +88,7 @@ public static void setUp() throws Exception { QueryRunnerFactory factory = new TopNQueryRunnerFactory( new StupidPool( + "TopNQueryRunnerFactory-directBufferPool", new Supplier() { @Override diff --git a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java index ce1a2e4c9c16..e48eb397afae 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNQueryRunnerTest.java @@ -116,6 +116,7 @@ public static Iterable constructorFeeder() throws IOException QueryRunnerTestHelper.makeQueryRunners( new TopNQueryRunnerFactory( new StupidPool( + "TopNQueryRunnerFactory-bufferPool", new Supplier() { @Override diff --git a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java index 2125c9070490..6f49aa2e2e42 100644 --- a/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java +++ b/processing/src/test/java/io/druid/query/topn/TopNUnionQueryTest.java @@ -76,6 +76,7 @@ public static Iterable constructorFeeder() throws IOException QueryRunnerTestHelper.makeUnionQueryRunners( new TopNQueryRunnerFactory( new StupidPool( + "TopNQueryRunnerFactory-bufferPool", new Supplier() { @Override diff --git a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java index 59b48b9fa678..916f264ba19a 100644 --- a/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/data/IncrementalIndexTest.java @@ -136,6 +136,7 @@ public IncrementalIndex createIndex(AggregatorFactory[] factories) return new OffheapIncrementalIndex( 0L, QueryGranularities.NONE, factories, 1000000, new StupidPool( + "OffheapIncrementalIndex-bufferPool", new Supplier() { @Override @@ -168,6 +169,7 @@ public IncrementalIndex createIndex(AggregatorFactory[] factories) return new OffheapIncrementalIndex( 0L, QueryGranularities.NONE, false, factories, 1000000, new StupidPool( + "OffheapIncrementalIndex-bufferPool", new Supplier() { @Override diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index 34b79506d49d..eaaa14da3da7 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -223,6 +223,7 @@ public int getMaxIntermediateRows() } ), new StupidPool( + "GroupByQueryEngine-bufferPool", new Supplier() { @Override @@ -306,6 +307,7 @@ public void testSingleValueTopN() throws IOException TopNQueryEngine engine = new TopNQueryEngine( new StupidPool( + "TopNQueryEngine-bufferPool", new Supplier() { @Override diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index 8574c937288d..b1521448e0bf 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -116,16 +116,22 @@ public IncrementalIndex createIndex() public IncrementalIndex createIndex() { return new OffheapIncrementalIndex( - schema, true, true, sortFacts, 1000000, new StupidPool( - new Supplier() - { - @Override - public ByteBuffer get() - { - return ByteBuffer.allocate(256 * 1024); - } - } - ) + schema, + true, + true, + sortFacts, + 1000000, + new StupidPool( + "OffheapIncrementalIndex-bufferPool", + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(256 * 1024); + } + } + ) ); } } diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index dca3916d967b..79ede1b4fcdf 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -109,6 +109,7 @@ public StupidPool getIntermediateResultsPool(DruidProcessingConfig c { verifyDirectMemory(config); return new StupidPool<>( + "intermediate processing pool", new OffheapBufferGenerator("intermediate processing", config.intermediateComputeSizeBytes()), config.getNumThreads(), config.poolCacheMaxCount() diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index 7b25ed5cfb20..b6d7c1b5021c 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -178,6 +178,7 @@ public static SpecificSegmentsQuerySegmentWalker createWalker(final File tmpDir, TopNQuery.class, new TopNQueryRunnerFactory( new StupidPool<>( + "TopNQueryRunnerFactory-bufferPool", new Supplier() { @Override