diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 95d59856395f..bbff131e8671 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -378,7 +378,6 @@ public String getFormatString()
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
- bufferPool,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
@@ -387,7 +386,8 @@ public String getFormatString()
factory = new GroupByQueryRunnerFactory(
groupingEngine,
- new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool)
+ new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool),
+ bufferPool
);
}
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
index 24afa1e84772..8e0715e0fe5c 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java
@@ -362,14 +362,13 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
final GroupingEngine groupingEngine = new GroupingEngine(
processingConfig,
configSupplier,
- bufferPool,
groupByResourcesReservationPool,
mapper,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
final GroupByQueryQueryToolChest toolChest = new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool);
- return new GroupByQueryRunnerFactory(groupingEngine, toolChest);
+ return new GroupByQueryRunnerFactory(groupingEngine, toolChest, bufferPool);
}
@TearDown(Level.Trial)
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index e7220cc286d9..5ab19b6235f7 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -495,7 +495,6 @@ public String getFormatString()
final GroupingEngine groupingEngine = new GroupingEngine(
druidProcessingConfig,
configSupplier,
- bufferPool,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
@@ -504,7 +503,8 @@ public String getFormatString()
factory = new GroupByQueryRunnerFactory(
groupingEngine,
- new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool)
+ new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool),
+ bufferPool
);
}
diff --git a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
index 9271b9b3e988..c1fed4bc5034 100644
--- a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
+++ b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java
@@ -99,8 +99,7 @@ public int getNumThreads()
return 1;
}
},
- () -> config,
- new StupidPool<>("map-virtual-column-groupby-test", () -> ByteBuffer.allocate(1024)),
+ GroupByQueryConfig::new,
groupByResourcesReservationPool,
TestHelper.makeJsonMapper(),
new DefaultObjectMapper(),
@@ -109,7 +108,8 @@ public int getNumThreads()
final GroupByQueryRunnerFactory factory = new GroupByQueryRunnerFactory(
groupingEngine,
- new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool)
+ new GroupByQueryQueryToolChest(groupingEngine, groupByResourcesReservationPool),
+ new StupidPool<>("map-virtual-column-groupby-test", () -> ByteBuffer.allocate(1024))
);
runner = QueryRunnerTestHelper.makeQueryRunner(
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
index 2ab016e10e48..c5131ddd84ec 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
@@ -19,7 +19,6 @@
package org.apache.druid.msq.exec;
-import com.google.common.base.Preconditions;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.kernel.controller.ControllerQueryKernel;
@@ -29,10 +28,10 @@
* Class for determining how much JVM heap to allocate to various purposes for {@link Controller}.
*
* First, look at how much of total JVM heap that is dedicated for MSQ; see
- * {@link MemoryIntrospector#usableMemoryInJvm()}.
+ * {@link MemoryIntrospector#memoryPerTask()}.
*
* Then, we split up that total amount of memory into equally-sized portions per {@link Controller}; see
- * {@link MemoryIntrospector#numQueriesInJvm()}. The number of controllers is based entirely on server configuration,
+ * {@link MemoryIntrospector#numTasksInJvm()}. The number of controllers is based entirely on server configuration,
* which makes the calculation robust to different queries running simultaneously in the same JVM.
*
* Then, we split that up into a chunk used for input channels, and a chunk used for partition statistics.
@@ -70,29 +69,28 @@ public static ControllerMemoryParameters createProductionInstance(
final int maxWorkerCount
)
{
- final long usableMemoryInJvm = memoryIntrospector.usableMemoryInJvm();
- final int numControllersInJvm = memoryIntrospector.numQueriesInJvm();
- Preconditions.checkArgument(usableMemoryInJvm > 0, "Usable memory[%s] must be > 0", usableMemoryInJvm);
- Preconditions.checkArgument(numControllersInJvm > 0, "Number of controllers[%s] must be > 0", numControllersInJvm);
- Preconditions.checkArgument(maxWorkerCount > 0, "Number of workers[%s] must be > 0", maxWorkerCount);
-
- final long memoryPerController = usableMemoryInJvm / numControllersInJvm;
- final long memoryForInputChannels = WorkerMemoryParameters.memoryNeededForInputChannels(maxWorkerCount);
+ final long totalMemory = memoryIntrospector.memoryPerTask();
+ final long memoryForInputChannels =
+ WorkerMemoryParameters.computeProcessorMemoryForInputChannels(
+ maxWorkerCount,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
+ );
final int partitionStatisticsMaxRetainedBytes = (int) Math.min(
- memoryPerController - memoryForInputChannels,
+ totalMemory - memoryForInputChannels,
PARTITION_STATS_MAX_MEMORY
);
if (partitionStatisticsMaxRetainedBytes < PARTITION_STATS_MIN_MEMORY) {
- final long requiredMemory = memoryForInputChannels + PARTITION_STATS_MIN_MEMORY;
+ final long requiredTaskMemory = memoryForInputChannels + PARTITION_STATS_MIN_MEMORY;
throw new MSQException(
new NotEnoughMemoryFault(
- memoryIntrospector.computeJvmMemoryRequiredForUsableMemory(requiredMemory),
+ memoryIntrospector.computeJvmMemoryRequiredForTaskMemory(requiredTaskMemory),
memoryIntrospector.totalMemoryInJvm(),
- usableMemoryInJvm,
- numControllersInJvm,
- memoryIntrospector.numProcessorsInJvm(),
- 0
+ memoryIntrospector.memoryPerTask(),
+ memoryIntrospector.numTasksInJvm(),
+ memoryIntrospector.numProcessingThreads(),
+ maxWorkerCount,
+ 1
)
);
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
index bb782cb67d9a..fd2107762777 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
@@ -24,7 +24,7 @@ public class Limits
/**
* Maximum number of columns that can appear in a frame signature.
*
- * Somewhat less than {@link WorkerMemoryParameters#STANDARD_FRAME_SIZE} divided by typical minimum column size:
+ * Somewhat less than {@link WorkerMemoryParameters#DEFAULT_FRAME_SIZE} divided by typical minimum column size:
* {@link org.apache.druid.frame.allocation.AppendableMemory#DEFAULT_INITIAL_ALLOCATION_SIZE}.
*/
public static final int MAX_FRAME_COLUMNS = 2000;
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MemoryIntrospector.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MemoryIntrospector.java
index 337e36d14efa..76fcb33005a0 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MemoryIntrospector.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MemoryIntrospector.java
@@ -19,10 +19,8 @@
package org.apache.druid.msq.exec;
-import org.apache.druid.msq.kernel.WorkOrder;
-
/**
- * Introspector used to generate {@link ControllerMemoryParameters}.
+ * Introspector used to generate {@link WorkerMemoryParameters} and {@link ControllerMemoryParameters}.
*/
public interface MemoryIntrospector
{
@@ -32,34 +30,23 @@ public interface MemoryIntrospector
long totalMemoryInJvm();
/**
- * Amount of memory usable for the multi-stage query engine in the entire JVM.
- *
- * This may be an expensive operation. For example, the production implementation {@link MemoryIntrospectorImpl}
- * estimates size of all lookups as part of computing this value.
+ * Amount of memory alloted to each {@link Worker} or {@link Controller}.
*/
- long usableMemoryInJvm();
+ long memoryPerTask();
/**
- * Amount of total JVM memory required for a particular amount of usable memory to be available.
- *
- * This may be an expensive operation. For example, the production implementation {@link MemoryIntrospectorImpl}
- * estimates size of all lookups as part of computing this value.
+ * Computes the amount of total JVM memory that would be required for a particular memory allotment per task, i.e.,
+ * a particular return value from {@link #memoryPerTask()}.
*/
- long computeJvmMemoryRequiredForUsableMemory(long usableMemory);
+ long computeJvmMemoryRequiredForTaskMemory(long memoryPerTask);
/**
- * Maximum number of queries that run simultaneously in this JVM.
- *
- * On workers, this is the maximum number of {@link Worker} that run simultaneously in this JVM. See
- * {@link WorkerMemoryParameters} for how memory is divided among and within {@link WorkOrder} run by a worker.
- *
- * On controllers, this is the maximum number of {@link Controller} that run simultaneously. See
- * {@link ControllerMemoryParameters} for how memory is used by controllers.
+ * Maximum number of tasks ({@link Worker} or {@link Controller}) that run simultaneously in this JVM.
*/
- int numQueriesInJvm();
+ int numTasksInJvm();
/**
- * Maximum number of processing threads that can be used at once in this JVM.
+ * Maximum number of processing threads that can be used at once by each {@link Worker} or {@link Controller}.
*/
- int numProcessorsInJvm();
+ int numProcessingThreads();
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MemoryIntrospectorImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MemoryIntrospectorImpl.java
index f7cd501ed8fd..93d0b9de2713 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MemoryIntrospectorImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/MemoryIntrospectorImpl.java
@@ -20,12 +20,14 @@
package org.apache.druid.msq.exec;
import com.google.common.collect.ImmutableList;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
+import javax.annotation.Nullable;
import java.util.List;
/**
@@ -34,37 +36,47 @@
public class MemoryIntrospectorImpl implements MemoryIntrospector
{
private static final Logger log = new Logger(MemoryIntrospectorImpl.class);
+ private static final long LOOKUP_FOOTPRINT_INIT = Long.MIN_VALUE;
- private final LookupExtractorFactoryContainerProvider lookupProvider;
private final long totalMemoryInJvm;
- private final int numQueriesInJvm;
- private final int numProcessorsInJvm;
private final double usableMemoryFraction;
+ private final int numTasksInJvm;
+ private final int numProcessingThreads;
+
+ /**
+ * Lookup footprint per task, set the first time {@link #memoryPerTask()} is called.
+ */
+ private volatile long lookupFootprint = LOOKUP_FOOTPRINT_INIT;
+
+ @Nullable
+ private final LookupExtractorFactoryContainerProvider lookupProvider;
/**
* Create an introspector.
*
- * @param lookupProvider provider of lookups; we use this to subtract lookup size from total JVM memory when
- * computing usable memory
* @param totalMemoryInJvm maximum JVM heap memory
* @param usableMemoryFraction fraction of JVM memory, after subtracting lookup overhead, that we consider usable
- * for multi-stage queries
- * @param numQueriesInJvm maximum number of {@link Controller} or {@link Worker} that may run concurrently
- * @param numProcessorsInJvm size of processing thread pool, typically {@link DruidProcessingConfig#getNumThreads()}
+ * for {@link Controller} or {@link Worker}
+ * @param numTasksInJvm maximum number of {@link Controller} or {@link Worker} that may run concurrently
+ * @param numProcessingThreads size of processing thread pool, typically {@link DruidProcessingConfig#getNumThreads()}
+ * @param lookupProvider provider of lookups; we use this to subtract lookup size from total JVM memory when
+ * computing usable memory. Ignored if null. This is used once the first time
+ * {@link #memoryPerTask()} is called, then the footprint is cached. As such, it provides
+ * a point-in-time view only.
*/
public MemoryIntrospectorImpl(
- final LookupExtractorFactoryContainerProvider lookupProvider,
final long totalMemoryInJvm,
final double usableMemoryFraction,
- final int numQueriesInJvm,
- final int numProcessorsInJvm
+ final int numTasksInJvm,
+ final int numProcessingThreads,
+ @Nullable final LookupExtractorFactoryContainerProvider lookupProvider
)
{
- this.lookupProvider = lookupProvider;
this.totalMemoryInJvm = totalMemoryInJvm;
- this.numQueriesInJvm = numQueriesInJvm;
- this.numProcessorsInJvm = numProcessorsInJvm;
this.usableMemoryFraction = usableMemoryFraction;
+ this.numTasksInJvm = numTasksInJvm;
+ this.numProcessingThreads = numProcessingThreads;
+ this.lookupProvider = lookupProvider;
}
@Override
@@ -74,33 +86,52 @@ public long totalMemoryInJvm()
}
@Override
- public long usableMemoryInJvm()
+ public long memoryPerTask()
{
- final long totalMemory = totalMemoryInJvm();
- final long totalLookupFootprint = computeTotalLookupFootprint(true);
return Math.max(
0,
- (long) ((totalMemory - totalLookupFootprint) * usableMemoryFraction)
+ (long) ((totalMemoryInJvm - getTotalLookupFootprint()) * usableMemoryFraction) / numTasksInJvm
);
}
@Override
- public long computeJvmMemoryRequiredForUsableMemory(long usableMemory)
+ public long computeJvmMemoryRequiredForTaskMemory(long memoryPerTask)
{
- final long totalLookupFootprint = computeTotalLookupFootprint(false);
- return (long) Math.ceil(usableMemory / usableMemoryFraction + totalLookupFootprint);
+ if (memoryPerTask <= 0) {
+ throw new IAE("Invalid memoryPerTask[%d], expected a positive number", memoryPerTask);
+ }
+
+ return (long) Math.ceil(memoryPerTask * numTasksInJvm / usableMemoryFraction) + getTotalLookupFootprint();
}
@Override
- public int numQueriesInJvm()
+ public int numTasksInJvm()
{
- return numQueriesInJvm;
+ return numTasksInJvm;
}
@Override
- public int numProcessorsInJvm()
+ public int numProcessingThreads()
{
- return numProcessorsInJvm;
+ return numProcessingThreads;
+ }
+
+ /**
+ * Get a possibly-cached value of {@link #computeTotalLookupFootprint()}. The underlying computation method is
+ * called just once, meaning this is not a good way to track the size of lookups over time. This is done to keep
+ * memory calculations as consistent as possible.
+ */
+ private long getTotalLookupFootprint()
+ {
+ if (lookupFootprint == LOOKUP_FOOTPRINT_INIT) {
+ synchronized (this) {
+ if (lookupFootprint == LOOKUP_FOOTPRINT_INIT) {
+ lookupFootprint = computeTotalLookupFootprint();
+ }
+ }
+ }
+
+ return lookupFootprint;
}
/**
@@ -108,11 +139,13 @@ public int numProcessorsInJvm()
*
* Correctness of this approach depends on lookups being loaded *before* calling this method. Luckily, this is the
* typical mode of operation, since by default druid.lookup.enableLookupSyncOnStartup = true.
- *
- * @param logFootprint whether footprint should be logged
*/
- private long computeTotalLookupFootprint(final boolean logFootprint)
+ private long computeTotalLookupFootprint()
{
+ if (lookupProvider == null) {
+ return 0;
+ }
+
final List lookupNames = ImmutableList.copyOf(lookupProvider.getAllLookupNames());
long lookupFootprint = 0;
@@ -131,10 +164,7 @@ private long computeTotalLookupFootprint(final boolean logFootprint)
}
}
- if (logFootprint) {
- log.info("Lookup footprint: lookup count[%d], total bytes[%,d].", lookupNames.size(), lookupFootprint);
- }
-
+ log.info("Lookup footprint: lookup count[%d], total bytes[%,d].", lookupNames.size(), lookupFootprint);
return lookupFootprint;
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffers.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffers.java
new file mode 100644
index 000000000000..b12f23be8519
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffers.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.collections.QueueNonBlockingPool;
+import org.apache.druid.frame.processor.Bouncer;
+import org.apache.druid.msq.kernel.FrameContext;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Holds a processing buffer pool, and a {@link Bouncer} used to limit concurrent access to the buffer pool.
+ * Thread-safe. Used by {@link RunWorkOrder} by way of {@link FrameContext#processingBuffers()}.
+ */
+public class ProcessingBuffers
+{
+ private final NonBlockingPool bufferPool;
+ private final Bouncer bouncer;
+
+ public ProcessingBuffers(final NonBlockingPool bufferPool, final Bouncer bouncer)
+ {
+ this.bufferPool = bufferPool;
+ this.bouncer = bouncer;
+ }
+
+ public static ProcessingBuffers fromCollection(final Collection bufferPool)
+ {
+ final BlockingQueue queue = new ArrayBlockingQueue<>(bufferPool.size());
+ queue.addAll(bufferPool);
+ return new ProcessingBuffers(new QueueNonBlockingPool<>(queue), new Bouncer(queue.size()));
+ }
+
+ public NonBlockingPool getBufferPool()
+ {
+ return bufferPool;
+ }
+
+ public Bouncer getBouncer()
+ {
+ return bouncer;
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersProvider.java
new file mode 100644
index 000000000000..fb77d1c30783
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersProvider.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.msq.kernel.FrameProcessorFactory;
+import org.apache.druid.msq.kernel.QueryDefinition;
+
+/**
+ * Provides processing buffers for {@link org.apache.druid.msq.kernel.WorkOrder}. Thread-safe, shared by all
+ * {@link Worker} in a particular JVM.
+ */
+public interface ProcessingBuffersProvider
+{
+ /**
+ * Acquire buffers for a {@link Worker}.
+ */
+ ResourceHolder acquire(int poolSize);
+
+ /**
+ * Acquire buffers for a {@link Worker}, using a pool size equal to the minimum of
+ * {@link WorkerContext#maxConcurrentStages()} and the number of stages in the query where
+ * {@link FrameProcessorFactory#usesProcessingBuffers()}. (These are both caps on the number of concurrent
+ * stages that will need processing buffers at once.)
+ */
+ default ResourceHolder acquire(
+ final QueryDefinition queryDef,
+ final int maxConcurrentStages
+ )
+ {
+ final int poolSize = Math.min(
+ maxConcurrentStages,
+ (int) queryDef.getStageDefinitions()
+ .stream()
+ .filter(stageDef -> stageDef.getProcessorFactory().usesProcessingBuffers())
+ .count()
+ );
+
+ return acquire(poolSize);
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersSet.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersSet.java
new file mode 100644
index 000000000000..7f81a9c4a9c1
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ProcessingBuffersSet.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.exec;
+
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.msq.kernel.StageDefinition;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.stream.Collectors;
+
+/**
+ * Holds a set of {@link ProcessingBuffers} for a {@link Worker}. Acquired from {@link ProcessingBuffersProvider}.
+ */
+public class ProcessingBuffersSet
+{
+ public static final ProcessingBuffersSet EMPTY = new ProcessingBuffersSet(Collections.emptyList());
+
+ private final BlockingQueue pool;
+
+ public ProcessingBuffersSet(Collection buffers)
+ {
+ this.pool = new ArrayBlockingQueue<>(buffers.isEmpty() ? 1 : buffers.size());
+ this.pool.addAll(buffers);
+ }
+
+ /**
+ * Equivalent to calling {@link ProcessingBuffers#fromCollection} on each collection in the overall collection,
+ * then creating an instance.
+ */
+ public static > ProcessingBuffersSet fromCollection(final Collection processingBuffers)
+ {
+ return new ProcessingBuffersSet(
+ processingBuffers.stream()
+ .map(ProcessingBuffers::fromCollection)
+ .collect(Collectors.toList())
+ );
+ }
+
+ @Nullable
+ public ResourceHolder acquireForStage(final StageDefinition stageDef)
+ {
+ if (!stageDef.getProcessorFactory().usesProcessingBuffers()) {
+ return null;
+ }
+
+ final ProcessingBuffers buffers = pool.poll();
+
+ if (buffers == null) {
+ // Never happens, because the pool acquired from ProcessingBuffersProvider must be big enough for all
+ // concurrent processing buffer needs. (In other words: if this does happen, it's a bug.)
+ throw DruidException.defensive("Processing buffers not available");
+ }
+
+ return new ResourceHolder()
+ {
+ @Override
+ public ProcessingBuffers get()
+ {
+ return buffers;
+ }
+
+ @Override
+ public void close()
+ {
+ pool.add(buffers);
+ }
+ };
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
index a4d6a2180bde..4d028147af02 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
@@ -242,7 +242,7 @@ private void makeInputSliceReader()
workOrder.getQueryDefinition(),
InputSlices.allReadablePartitions(workOrder.getInputs()),
inputChannelFactory,
- () -> ArenaMemoryAllocator.createOnHeap(frameContext.memoryParameters().getStandardFrameSize()),
+ () -> ArenaMemoryAllocator.createOnHeap(frameContext.memoryParameters().getFrameSize()),
exec,
cancellationId,
counterTracker,
@@ -270,18 +270,8 @@ private void makeWorkOutputChannelFactory()
final OutputChannelFactory baseOutputChannelFactory;
if (workOrder.getStageDefinition().doesShuffle()) {
- // Writing to a consumer in the same JVM (which will be set up later on in this method). Use the large frame
- // size if we're writing to a SuperSorter, since we'll generate fewer temp files if we use larger frames.
- // Otherwise, use the standard frame size.
- final int frameSize;
-
- if (workOrder.getStageDefinition().getShuffleSpec().kind().isSort()) {
- frameSize = frameContext.memoryParameters().getLargeFrameSize();
- } else {
- frameSize = frameContext.memoryParameters().getStandardFrameSize();
- }
-
- baseOutputChannelFactory = new BlockingQueueOutputChannelFactory(frameSize);
+ // Writing to a consumer in the same JVM (which will be set up later on in this method).
+ baseOutputChannelFactory = new BlockingQueueOutputChannelFactory(frameContext.memoryParameters().getFrameSize());
} else {
// Writing stage output.
baseOutputChannelFactory = makeStageOutputChannelFactory();
@@ -353,7 +343,7 @@ private workResultFuture = exec.runAllFully(
counterTracker.trackCpu(processorManager, CpuCounters.LABEL_MAIN),
maxOutstandingProcessors,
- frameContext.processorBouncer(),
+ processorFactory.usesProcessingBuffers() ? frameContext.processingBuffers().getBouncer() : Bouncer.unlimited(),
cancellationId
);
@@ -394,13 +384,13 @@ private void makeAndRunShuffleProcessors()
if (shuffleSpec.partitionCount() == 1) {
// Single partition; no need to write temporary files.
hashOutputChannelFactory =
- new BlockingQueueOutputChannelFactory(frameContext.memoryParameters().getStandardFrameSize());
+ new BlockingQueueOutputChannelFactory(frameContext.memoryParameters().getFrameSize());
} else {
// Multi-partition; write temporary files and then sort each one file-by-file.
hashOutputChannelFactory =
new FileOutputChannelFactory(
frameContext.tempDir("hash-parts"),
- frameContext.memoryParameters().getStandardFrameSize(),
+ frameContext.memoryParameters().getFrameSize(),
null
);
}
@@ -490,7 +480,7 @@ private void writeDurableStorageSuccessFile()
final DurableStorageOutputChannelFactory durableStorageOutputChannelFactory =
makeDurableStorageOutputChannelFactory(
frameContext.tempDir("durable"),
- frameContext.memoryParameters().getStandardFrameSize(),
+ frameContext.memoryParameters().getFrameSize(),
workOrder.getOutputChannelMode() == OutputChannelMode.DURABLE_STORAGE_QUERY_RESULTS
);
@@ -510,7 +500,7 @@ private OutputChannelFactory makeStageOutputChannelFactory()
{
// Use the standard frame size, since we assume this size when computing how much is needed to merge output
// files from different workers.
- final int frameSize = frameContext.memoryParameters().getStandardFrameSize();
+ final int frameSize = frameContext.memoryParameters().getFrameSize();
final OutputChannelMode outputChannelMode = workOrder.getOutputChannelMode();
switch (outputChannelMode) {
@@ -542,7 +532,7 @@ private OutputChannelFactory makeStageOutputChannelFactory()
private OutputChannelFactory makeSuperSorterIntermediateOutputChannelFactory(final File tmpDir)
{
- final int frameSize = frameContext.memoryParameters().getLargeFrameSize();
+ final int frameSize = frameContext.memoryParameters().getFrameSize();
final File fileChannelDirectory =
new File(tmpDir, StringUtils.format("intermediate_output_stage_%06d", workOrder.getStageNumber()));
final FileOutputChannelFactory fileOutputChannelFactory =
@@ -736,8 +726,8 @@ public FrameProcessor decorate(FrameProcessor processor)
},
outputChannelFactory,
makeSuperSorterIntermediateOutputChannelFactory(sorterTmpDir),
- memoryParameters.getSuperSorterMaxActiveProcessors(),
- memoryParameters.getSuperSorterMaxChannelsPerProcessor(),
+ memoryParameters.getSuperSorterConcurrentProcessors(),
+ memoryParameters.getSuperSorterMaxChannelsPerMerger(),
stageDefinition.getShuffleSpec().limitHint(),
cancellationId,
counterTracker.sortProgress(),
@@ -774,7 +764,7 @@ public void hashPartition(final OutputChannelFactory outputChannelFactory)
workOrder.getStageDefinition().getFrameReader(),
workOrder.getStageDefinition().getClusterBy().getColumns().size(),
FrameWriters.makeRowBasedFrameWriterFactory(
- new ArenaMemoryAllocatorFactory(frameContext.memoryParameters().getStandardFrameSize()),
+ new ArenaMemoryAllocatorFactory(frameContext.memoryParameters().getFrameSize()),
workOrder.getStageDefinition().getSignature(),
workOrder.getStageDefinition().getSortKey(),
removeNullBytes
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
index 666115d774cf..96c7ad20697d 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerContext.java
@@ -25,7 +25,6 @@
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.kernel.FrameProcessorFactory;
-import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.server.DruidNode;
@@ -78,14 +77,15 @@ public interface WorkerContext
WorkerClient makeWorkerClient();
/**
- * Directory for temporary outputs.
+ * Directory for temporary outputs, used as a base for {@link FrameContext#tempDir()}. This directory is not
+ * necessarily fully owned by the worker.
*/
File tempDir();
/**
* Create a context with useful objects required by {@link FrameProcessorFactory#makeProcessors}.
*/
- FrameContext frameContext(QueryDefinition queryDef, int stageNumber, OutputChannelMode outputChannelMode);
+ FrameContext frameContext(WorkOrder workOrder);
/**
* Number of available processing threads.
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 92664feeabbb..1dc00946da41 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -376,13 +376,7 @@ private void handleNewWorkOrder(
? StringUtils.format(", payload[%s]", context.jsonMapper().writeValueAsString(workOrder)) : "")
);
- final FrameContext frameContext = kernelHolder.processorCloser.register(
- context.frameContext(
- workOrder.getQueryDefinition(),
- stageDefinition.getStageNumber(),
- workOrder.getOutputChannelMode()
- )
- );
+ final FrameContext frameContext = kernelHolder.processorCloser.register(context.frameContext(workOrder));
kernelHolder.processorCloser.register(() -> {
try {
workerExec.cancel(cancellationId);
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index aeaae030e613..2884efe1f0b0 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -19,92 +19,66 @@
package org.apache.druid.msq.exec;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
import com.google.common.primitives.Ints;
-import com.google.inject.Injector;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
-import org.apache.druid.frame.processor.Bouncer;
-import org.apache.druid.indexing.worker.config.WorkerConfig;
-import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.processor.SuperSorter;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.NotEnoughMemoryFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
-import org.apache.druid.msq.input.InputSpecs;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.indexing.processor.KeyStatisticsCollectionProcessor;
+import org.apache.druid.msq.indexing.processor.SegmentGeneratorFrameProcessorFactory;
+import org.apache.druid.msq.input.InputSlice;
+import org.apache.druid.msq.input.InputSlices;
+import org.apache.druid.msq.input.stage.ReadablePartition;
+import org.apache.druid.msq.input.stage.StageInputSlice;
+import org.apache.druid.msq.kernel.GlobalSortMaxCountShuffleSpec;
+import org.apache.druid.msq.kernel.ShuffleSpec;
import org.apache.druid.msq.kernel.StageDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
-import org.apache.druid.query.lookup.LookupExtractor;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainer;
-import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
-import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
+import org.apache.druid.segment.incremental.IncrementalIndex;
+import javax.annotation.Nullable;
+import java.util.List;
import java.util.Objects;
/**
- * Class for determining how much JVM heap to allocate to various purposes.
+ * Class for determining how much JVM heap to allocate to various purposes for executing a {@link WorkOrder}.
*
- * First, we take a chunk out of the total JVM heap that is dedicated for MSQ; see {@link #computeUsableMemoryInJvm}.
+ * First, we split each worker's memory allotment, given by {@link MemoryIntrospector#memoryPerTask()}, into
+ * equally-sized "bundles" for each {@link WorkOrder} that may be running simultaneously within the {@link Worker}
+ * for that {@link WorkOrder}.
*
- * Then, we carve out some space for each worker that may be running in our JVM; see {@link #memoryPerWorker}.
+ * Within each bundle, we carve out memory required for buffering broadcast data
+ * (see {@link #computeBroadcastBufferMemory}) and for concurrently-running processors
+ * (see {@link #computeProcessorMemory}).
*
- * Then, we split the rest into "bundles" of equal size; see {@link #memoryPerBundle}. The number of bundles is based
- * entirely on server configuration; this makes the calculation robust to different queries running simultaneously in
- * the same JVM.
- *
- * Within each bundle, we split up memory in two different ways: one assuming it'll be used for a
- * {@link org.apache.druid.frame.processor.SuperSorter}, and one assuming it'll be used for a regular
- * processor. Callers can then use whichever set of allocations makes sense. (We assume no single bundle
- * will be used for both purposes.)
+ * The remainder is called "bundle free memory", a pool of memory that can be used for {@link SuperSorter} or
+ * {@link SegmentGeneratorFrameProcessorFactory}. The amounts overlap, because the same {@link WorkOrder} never
+ * does both.
*/
public class WorkerMemoryParameters
{
- private static final Logger log = new Logger(WorkerMemoryParameters.class);
-
/**
- * Percent of memory that we allocate to bundles. It is less than 100% because we need to leave some space
- * left over for miscellaneous other stuff, and to ensure that GC pressure does not get too high.
+ * Default size for frames.
*/
- static final double USABLE_MEMORY_FRACTION = 0.75;
+ public static final int DEFAULT_FRAME_SIZE = 1_000_000;
/**
- * Percent of each bundle's memory that we allocate to appenderators. It is less than 100% because appenderators
- * unfortunately have a variety of unaccounted-for memory usage.
- */
- static final double APPENDERATOR_MEMORY_FRACTION = 0.67;
-
- /**
- * Size for "standard frames", which are used for most purposes, except inputs to super-sorters.
- *
- * In particular, frames that travel between workers are always the minimum size. This is helpful because it makes
- * it easier to compute the amount of memory needed to merge input streams.
+ * Amount of extra memory available for each processing thread, beyond what is needed for input and output
+ * channels. This memory is used for miscellaneous purposes within the various {@link FrameProcessor}.
*/
- private static final int STANDARD_FRAME_SIZE = 1_000_000;
+ private static final long EXTRA_MEMORY_PER_PROCESSOR = 25_000_000;
/**
- * Size for "large frames", which are used for inputs and inner channels in to super-sorters.
- *
- * This is helpful because it minimizes the number of temporary files needed during super-sorting.
- */
- private static final int LARGE_FRAME_SIZE = 8_000_000;
-
- /**
- * Minimum amount of bundle memory available for processing (i.e., total bundle size minus the amount
- * needed for input channels). This memory is guaranteed to be available for things like segment generation
- * and broadcast data.
- */
- public static final long PROCESSING_MINIMUM_BYTES = 25_000_000;
-
- /**
- * Maximum amount of parallelism for the super-sorter. Higher amounts of concurrency tend to be wasteful.
- */
- private static final int MAX_SUPER_SORTER_PROCESSORS = 4;
-
- /**
- * Each super-sorter must have at least 1 processor with 2 input frames and 1 output frame. That's 3 total.
+ * Percent of each bundle's free memory that we allocate to appenderators. It is less than 100% because appenderators
+ * unfortunately have a variety of unaccounted-for memory usage.
*/
- private static final int MIN_SUPER_SORTER_FRAMES = 3;
+ private static final double APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION = 0.67;
/**
* (Very) rough estimate of the on-heap overhead of reading a column.
@@ -112,256 +86,214 @@ public class WorkerMemoryParameters
private static final int APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN = 3_000;
/**
- * Maximum percent of *total* available memory (not each bundle), i.e. {@link #USABLE_MEMORY_FRACTION}, that we'll
- * ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl} across all workers.
+ * Maximum percent of each bundle's free memory that will be used for maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final double PARTITION_STATS_MEMORY_MAX_FRACTION = 0.1;
+ private static final double PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION = 0.1;
/**
- * Maximum number of bytes we'll ever use for maxRetainedBytes of {@link ClusterByStatisticsCollectorImpl} for
- * a single worker. Acts as a limit on the value computed based on {@link #PARTITION_STATS_MEMORY_MAX_FRACTION}.
+ * Maximum number of bytes from each bundle's free memory that we'll ever use for maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}. Limits the value computed based on
+ * {@link #PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION}.
*/
- private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;
+ private static final long PARTITION_STATS_MAX_MEMORY_PER_BUNDLE = 300_000_000;
/**
- * Threshold in bytes below which we assume that the worker is "small". While calculating the memory requirements for
- * a small worker, we try to be as conservatives with the estimates and the extra temporary space required by the
- * frames, since that can add up quickly and cause OOM.
+ * Minimum number of bytes from each bundle's free memory that we'll use for maxRetainedBytes of
+ * {@link ClusterByStatisticsCollectorImpl}.
*/
- private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES = 256_000_000;
+ private static final long PARTITION_STATS_MIN_MEMORY_PER_BUNDLE = 10_000_000;
/**
- * Fraction of free memory per bundle that can be used by {@link BroadcastJoinSegmentMapFnProcessor} to store broadcast
- * data on-heap. This is used to limit the total size of input frames, which we expect to expand on-heap. Expansion
- * can potentially be somewhat over 2x: for example, strings are UTF-8 in frames, but are UTF-16 on-heap, which is
- * a 2x expansion, and object and index overhead must be considered on top of that. So we use a value somewhat
- * lower than 0.5.
+ * Fraction of each bundle's total memory that can be used to buffer broadcast inputs. This is used by
+ * {@link BroadcastJoinSegmentMapFnProcessor} to limit how much joinable data is stored on-heap. This is carved
+ * directly out of the total bundle memory, which makes its size more predictable and stable: it only depends on
+ * the total JVM memory, the number of tasks per JVM, and the value of maxConcurrentStages for the query. This
+ * stability is important, because if the broadcast buffer fills up, the query fails. So any time its size changes,
+ * we risk queries failing that would formerly have succeeded.
*/
- static final double BROADCAST_JOIN_MEMORY_FRACTION = 0.3;
+ private static final double BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION = 0.2;
/**
- * Fraction of free memory per bundle that can be used by
- * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} to buffer frames in its trackers.
+ * Multiplier to apply to {@link #BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION} when determining how much free bundle
+ * memory is left over. This fudge factor exists because {@link BroadcastJoinSegmentMapFnProcessor} applies data
+ * size limits based on frame size, which we expect to expand somewhat in memory due to indexing structures in
+ * {@link org.apache.druid.segment.join.table.FrameBasedIndexedTable}.
*/
- static final double SORT_MERGE_JOIN_MEMORY_FRACTION = 0.9;
+ private static final double BROADCAST_BUFFER_OVERHEAD_RATIO = 1.5;
/**
- * In case {@link NotEnoughMemoryFault} is thrown, a fixed estimation overhead is added when estimating total memory required for the process.
+ * Amount of memory that can be used by
+ * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} to buffer frames in its trackers.
*/
- private static final long BUFFER_BYTES_FOR_ESTIMATION = 1000;
+ private static final long SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR = (long) (EXTRA_MEMORY_PER_PROCESSOR * 0.9);
- private final long processorBundleMemory;
- private final int superSorterMaxActiveProcessors;
- private final int superSorterMaxChannelsPerProcessor;
+ private final long bundleFreeMemory;
+ private final int frameSize;
+ private final int superSorterConcurrentProcessors;
+ private final int superSorterMaxChannelsPerMerger;
private final int partitionStatisticsMaxRetainedBytes;
-
- WorkerMemoryParameters(
- final long processorBundleMemory,
- final int superSorterMaxActiveProcessors,
- final int superSorterMaxChannelsPerProcessor,
- final int partitionStatisticsMaxRetainedBytes
+ private final long broadcastBufferMemory;
+
+ public WorkerMemoryParameters(
+ final long bundleFreeMemory,
+ final int frameSize,
+ final int superSorterConcurrentProcessors,
+ final int superSorterMaxChannelsPerMerger,
+ final int partitionStatisticsMaxRetainedBytes,
+ final long broadcastBufferMemory
)
{
- this.processorBundleMemory = processorBundleMemory;
- this.superSorterMaxActiveProcessors = superSorterMaxActiveProcessors;
- this.superSorterMaxChannelsPerProcessor = superSorterMaxChannelsPerProcessor;
+ this.bundleFreeMemory = bundleFreeMemory;
+ this.frameSize = frameSize;
+ this.superSorterConcurrentProcessors = superSorterConcurrentProcessors;
+ this.superSorterMaxChannelsPerMerger = superSorterMaxChannelsPerMerger;
this.partitionStatisticsMaxRetainedBytes = partitionStatisticsMaxRetainedBytes;
+ this.broadcastBufferMemory = broadcastBufferMemory;
}
/**
- * Create a production instance for {@link org.apache.druid.msq.indexing.MSQWorkerTask}.
+ * Create a production instance for a given {@link WorkOrder}.
*/
- public static WorkerMemoryParameters createProductionInstanceForWorker(
- final Injector injector,
- final QueryDefinition queryDef,
- final int stageNumber,
+ public static WorkerMemoryParameters createProductionInstance(
+ final WorkOrder workOrder,
+ final MemoryIntrospector memoryIntrospector,
final int maxConcurrentStages
)
{
- final StageDefinition stageDef = queryDef.getStageDefinition(stageNumber);
- final IntSet inputStageNumbers = InputSpecs.getStageNumbers(stageDef.getInputSpecs());
- final int numInputWorkers =
- inputStageNumbers.intStream()
- .map(inputStageNumber -> queryDef.getStageDefinition(inputStageNumber).getMaxWorkerCount())
- .sum();
- long totalLookupFootprint = computeTotalLookupFootprint(injector);
-
- final int numHashOutputPartitions;
- if (stageDef.doesShuffle() && stageDef.getShuffleSpec().kind().isHash()) {
- numHashOutputPartitions = stageDef.getShuffleSpec().partitionCount();
- } else {
- numHashOutputPartitions = 0;
- }
-
+ final StageDefinition stageDef = workOrder.getStageDefinition();
return createInstance(
- Runtime.getRuntime().maxMemory(),
- computeNumWorkersInJvm(injector),
- computeNumProcessorsInJvm(injector),
+ memoryIntrospector,
+ DEFAULT_FRAME_SIZE,
+ workOrder.getInputs(),
+ stageDef.getBroadcastInputNumbers(),
+ stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
maxConcurrentStages,
- numInputWorkers,
- numHashOutputPartitions,
- totalLookupFootprint
+ computeFramesPerOutputChannel(workOrder.getOutputChannelMode())
);
}
/**
- * Returns an object specifying memory-usage parameters.
+ * Returns an object specifying memory-usage parameters for a {@link WorkOrder} running inside a {@link Worker}.
*
* Throws a {@link MSQException} with an appropriate fault if the provided combination of parameters cannot
* yield a workable memory situation.
*
- * @param maxMemoryInJvm memory available in the entire JVM. This will be divided amongst processors.
- * @param numWorkersInJvm number of workers that can run concurrently in this JVM. Generally equal to
- * the task capacity.
- * @param numProcessingThreadsInJvm size of the processing thread pool in the JVM.
- * @param maxConcurrentStages maximum number of concurrent stages per worker.
- * @param numInputWorkers total number of workers across all input stages.
- * @param numHashOutputPartitions total number of output partitions, if using hash partitioning; zero if not using
- * hash partitioning.
- * @param totalLookupFootprint estimated size of the lookups loaded by the process.
+ * @param memoryIntrospector memory introspector
+ * @param frameSize frame size
+ * @param inputSlices from {@link WorkOrder#getInputs()}
+ * @param broadcastInputNumbers from {@link StageDefinition#getBroadcastInputNumbers()}
+ * @param shuffleSpec from {@link StageDefinition#getShuffleSpec()}
+ * @param maxConcurrentStages figure from {@link WorkerContext#maxConcurrentStages()}
+ * @param numFramesPerOutputChannel figure from {@link #computeFramesPerOutputChannel(OutputChannelMode)}
+ *
+ * @throws MSQException with {@link TooManyWorkersFault} or {@link NotEnoughMemoryFault} if not enough memory
+ * is available to generate a usable instance
*/
public static WorkerMemoryParameters createInstance(
- final long maxMemoryInJvm,
- final int numWorkersInJvm,
- final int numProcessingThreadsInJvm,
+ final MemoryIntrospector memoryIntrospector,
+ final int frameSize,
+ final List inputSlices,
+ final IntSet broadcastInputNumbers,
+ @Nullable final ShuffleSpec shuffleSpec,
final int maxConcurrentStages,
- final int numInputWorkers,
- final int numHashOutputPartitions,
- final long totalLookupFootprint
+ final int numFramesPerOutputChannel
)
{
- Preconditions.checkArgument(maxMemoryInJvm > 0, "Max memory passed: [%s] should be > 0", maxMemoryInJvm);
- Preconditions.checkArgument(numWorkersInJvm > 0, "Number of workers: [%s] in jvm should be > 0", numWorkersInJvm);
- Preconditions.checkArgument(
- numProcessingThreadsInJvm > 0,
- "Number of processing threads [%s] should be > 0",
- numProcessingThreadsInJvm
+ final long bundleMemory = computeBundleMemory(memoryIntrospector.memoryPerTask(), maxConcurrentStages);
+ final long processorMemory = computeProcessorMemory(
+ computeMaxSimultaneousInputChannelsPerProcessor(inputSlices, broadcastInputNumbers),
+ frameSize
);
- Preconditions.checkArgument(numInputWorkers >= 0, "Number of input workers: [%s] should be >=0", numInputWorkers);
- Preconditions.checkArgument(
- totalLookupFootprint >= 0,
- "Lookup memory footprint: [%s] should be >= 0",
- totalLookupFootprint
- );
- final long usableMemoryInJvm = computeUsableMemoryInJvm(maxMemoryInJvm, totalLookupFootprint);
- final long workerMemory = memoryPerWorker(usableMemoryInJvm, numWorkersInJvm);
- final long bundleMemory =
- memoryPerBundle(usableMemoryInJvm, numWorkersInJvm, numProcessingThreadsInJvm) / maxConcurrentStages;
- final long bundleMemoryForInputChannels = memoryNeededForInputChannels(numInputWorkers);
- final long bundleMemoryForHashPartitioning = memoryNeededForHashPartitioning(numHashOutputPartitions);
- final long bundleMemoryForProcessing =
- bundleMemory - bundleMemoryForInputChannels - bundleMemoryForHashPartitioning;
-
- if (bundleMemoryForProcessing < PROCESSING_MINIMUM_BYTES) {
- final int maxWorkers = computeMaxWorkers(
- usableMemoryInJvm,
- numWorkersInJvm,
- numProcessingThreadsInJvm,
- maxConcurrentStages,
- numHashOutputPartitions
- );
-
- if (maxWorkers > 0) {
- throw new MSQException(new TooManyWorkersFault(numInputWorkers, Math.min(Limits.MAX_WORKERS, maxWorkers)));
- } else {
- // Not enough memory for even one worker. More of a NotEnoughMemory situation than a TooManyWorkers situation.
- throw new MSQException(
- new NotEnoughMemoryFault(
- calculateSuggestedMinMemoryFromUsableMemory(
- estimateUsableMemory(
- numWorkersInJvm,
- numProcessingThreadsInJvm,
- PROCESSING_MINIMUM_BYTES + BUFFER_BYTES_FOR_ESTIMATION + bundleMemoryForInputChannels,
- maxConcurrentStages
- ), totalLookupFootprint),
- maxMemoryInJvm,
- usableMemoryInJvm,
- numWorkersInJvm,
- numProcessingThreadsInJvm,
- maxConcurrentStages
- )
- );
- }
- }
-
- // Compute memory breakdown for super-sorting bundles.
- final int maxNumFramesForSuperSorter = Ints.checkedCast(bundleMemory / WorkerMemoryParameters.LARGE_FRAME_SIZE);
-
- if (maxNumFramesForSuperSorter < MIN_SUPER_SORTER_FRAMES) {
+ final boolean hasBroadcastInputs = !broadcastInputNumbers.isEmpty();
+ final long broadcastBufferMemory =
+ hasBroadcastInputs ? computeBroadcastBufferMemoryIncludingOverhead(bundleMemory) : 0;
+ final int numProcessingThreads = memoryIntrospector.numProcessingThreads();
+ final int maxSimultaneousWorkProcessors = Math.min(numProcessingThreads, computeNumInputPartitions(inputSlices));
+ final long bundleFreeMemory =
+ bundleMemory - maxSimultaneousWorkProcessors * processorMemory - broadcastBufferMemory;
+
+ final long minimumBundleFreeMemory = computeMinimumBundleFreeMemory(frameSize, numFramesPerOutputChannel);
+ if (bundleFreeMemory < minimumBundleFreeMemory) {
+ final long requiredTaskMemory = bundleMemory - bundleFreeMemory + minimumBundleFreeMemory;
throw new MSQException(
new NotEnoughMemoryFault(
- calculateSuggestedMinMemoryFromUsableMemory(
- estimateUsableMemory(
- numWorkersInJvm,
- (MIN_SUPER_SORTER_FRAMES + BUFFER_BYTES_FOR_ESTIMATION) * LARGE_FRAME_SIZE,
- maxConcurrentStages
- ),
- totalLookupFootprint
- ),
- maxMemoryInJvm,
- usableMemoryInJvm,
- numWorkersInJvm,
- numProcessingThreadsInJvm,
+ memoryIntrospector.computeJvmMemoryRequiredForTaskMemory(requiredTaskMemory),
+ memoryIntrospector.totalMemoryInJvm(),
+ memoryIntrospector.memoryPerTask(),
+ memoryIntrospector.numTasksInJvm(),
+ memoryIntrospector.numProcessingThreads(),
+ computeNumInputWorkers(inputSlices),
maxConcurrentStages
)
);
}
- final int superSorterMaxActiveProcessors = Math.min(
- numProcessingThreadsInJvm,
- Math.min(
- maxNumFramesForSuperSorter / MIN_SUPER_SORTER_FRAMES,
- MAX_SUPER_SORTER_PROCESSORS
- )
- );
+ // Compute memory breakdown for super-sorting bundles.
+ final int partitionStatsMemory =
+ StageDefinition.mustGatherResultKeyStatistics(shuffleSpec) ? computePartitionStatsMemory(bundleFreeMemory) : 0;
+ final long superSorterMemory = bundleFreeMemory - partitionStatsMemory;
+ final int maxOutputPartitions = computeMaxOutputPartitions(shuffleSpec);
- final int isSmallWorker = usableMemoryInJvm < SMALL_WORKER_CAPACITY_THRESHOLD_BYTES ? 1 : 0;
- // Apportion max frames to all processors equally, then subtract one to account for an output frame and one to account
- // for the durable storage's output frame in the supersorter. The extra frame is required in case of durable storage
- // since composing output channel factories keep a frame open while writing to them.
- // We only account for this extra frame in the workers where the heap size is relatively small to be more
- // conservative with the memory estimations. In workers with heap size larger than the frame size, we can get away
- // without accounting for this extra frame, and instead better parallelize the supersorter's operations.
- final int superSorterMaxChannelsPerProcessor = maxNumFramesForSuperSorter / superSorterMaxActiveProcessors
- - 1
- - isSmallWorker;
- if (superSorterMaxActiveProcessors <= 0) {
+ int superSorterConcurrentProcessors;
+ int superSorterMaxChannelsPerMerger = -1;
+
+ if (maxOutputPartitions == 0) {
+ superSorterConcurrentProcessors = numProcessingThreads;
+ } else {
+ superSorterConcurrentProcessors = Math.min(maxOutputPartitions, numProcessingThreads);
+ }
+
+ for (; superSorterConcurrentProcessors > 0; superSorterConcurrentProcessors--) {
+ final long memoryPerProcessor = superSorterMemory / superSorterConcurrentProcessors;
+
+ // Each processor has at least 2 frames for inputs, plus numFramesPerOutputChannel for outputs.
+ // Compute whether we can support this level of parallelism, given these constraints.
+ final int minMemoryForInputsPerProcessor = 2 * frameSize;
+ final int memoryForOutputsPerProcessor = numFramesPerOutputChannel * frameSize;
+
+ if (memoryPerProcessor >= minMemoryForInputsPerProcessor + memoryForOutputsPerProcessor) {
+ final long memoryForInputsPerProcessor = memoryPerProcessor - memoryForOutputsPerProcessor;
+ superSorterMaxChannelsPerMerger = Ints.checkedCast(memoryForInputsPerProcessor / frameSize);
+ break;
+ }
+ }
+
+ if (superSorterConcurrentProcessors == 0) {
+ // Couldn't support any level of concurrency. Not expected, since we should have accounted for at least a
+ // minimally-sized SuperSorter by way of the calculation in "computeMinimumBundleFreeMemory". Return a
+ // NotEnoughMemoryFault with no suggestedServerMemory, since at this point, we aren't sure what will work.
throw new MSQException(
new NotEnoughMemoryFault(
- calculateSuggestedMinMemoryFromUsableMemory(
- estimateUsableMemory(
- numWorkersInJvm,
- numProcessingThreadsInJvm,
- PROCESSING_MINIMUM_BYTES + BUFFER_BYTES_FOR_ESTIMATION + bundleMemoryForInputChannels,
- maxConcurrentStages
- ), totalLookupFootprint),
- maxMemoryInJvm,
- usableMemoryInJvm,
- numWorkersInJvm,
- numProcessingThreadsInJvm,
+ 0,
+ memoryIntrospector.totalMemoryInJvm(),
+ memoryIntrospector.memoryPerTask(),
+ memoryIntrospector.numTasksInJvm(),
+ memoryIntrospector.numProcessingThreads(),
+ computeNumInputWorkers(inputSlices),
maxConcurrentStages
)
);
}
return new WorkerMemoryParameters(
- bundleMemoryForProcessing,
- superSorterMaxActiveProcessors,
- superSorterMaxChannelsPerProcessor,
-
- // 100% of worker memory is devoted to partition statistics
- Ints.checkedCast(workerMemory / maxConcurrentStages)
+ bundleFreeMemory,
+ frameSize,
+ superSorterConcurrentProcessors,
+ superSorterMaxChannelsPerMerger,
+ Math.min(Integer.MAX_VALUE, partitionStatsMemory / numProcessingThreads),
+ hasBroadcastInputs ? computeBroadcastBufferMemory(bundleMemory) : 0
);
}
- public int getSuperSorterMaxActiveProcessors()
+ public int getSuperSorterConcurrentProcessors()
{
- return superSorterMaxActiveProcessors;
+ return superSorterConcurrentProcessors;
}
- public int getSuperSorterMaxChannelsPerProcessor()
+ public int getSuperSorterMaxChannelsPerMerger()
{
- return superSorterMaxChannelsPerProcessor;
+ return superSorterMaxChannelsPerMerger;
}
public long getAppenderatorMaxBytesInMemory()
@@ -376,24 +308,27 @@ public int getAppenderatorMaxColumnsToMerge()
return Ints.checkedCast(Math.max(2, getAppenderatorMemory() / 2 / APPENDERATOR_MERGE_ROUGH_MEMORY_PER_COLUMN));
}
- public int getStandardFrameSize()
+ public int getFrameSize()
{
- return STANDARD_FRAME_SIZE;
+ return frameSize;
}
- public int getLargeFrameSize()
- {
- return LARGE_FRAME_SIZE;
- }
-
- public long getBroadcastJoinMemory()
+ /**
+ * Memory available for buffering broadcast data. Used to restrict the amount of memory used by
+ * {@link BroadcastJoinSegmentMapFnProcessor}.
+ */
+ public long getBroadcastBufferMemory()
{
- return (long) (processorBundleMemory * BROADCAST_JOIN_MEMORY_FRACTION);
+ return broadcastBufferMemory;
}
+ /**
+ * Fraction of each processor's memory that can be used by
+ * {@link org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessor} to buffer frames in its trackers.
+ */
public long getSortMergeJoinMemory()
{
- return (long) (processorBundleMemory * SORT_MERGE_JOIN_MEMORY_FRACTION);
+ return SORT_MERGE_JOIN_MEMORY_PER_PROCESSOR;
}
public int getPartitionStatisticsMaxRetainedBytes()
@@ -406,7 +341,7 @@ public int getPartitionStatisticsMaxRetainedBytes()
*/
private long getAppenderatorMemory()
{
- return (long) (processorBundleMemory * APPENDERATOR_MEMORY_FRACTION);
+ return (long) (bundleFreeMemory * APPENDERATOR_BUNDLE_FREE_MEMORY_FRACTION);
}
@Override
@@ -419,20 +354,24 @@ public boolean equals(Object o)
return false;
}
WorkerMemoryParameters that = (WorkerMemoryParameters) o;
- return processorBundleMemory == that.processorBundleMemory
- && superSorterMaxActiveProcessors == that.superSorterMaxActiveProcessors
- && superSorterMaxChannelsPerProcessor == that.superSorterMaxChannelsPerProcessor
- && partitionStatisticsMaxRetainedBytes == that.partitionStatisticsMaxRetainedBytes;
+ return bundleFreeMemory == that.bundleFreeMemory
+ && frameSize == that.frameSize
+ && superSorterConcurrentProcessors == that.superSorterConcurrentProcessors
+ && superSorterMaxChannelsPerMerger == that.superSorterMaxChannelsPerMerger
+ && partitionStatisticsMaxRetainedBytes == that.partitionStatisticsMaxRetainedBytes
+ && broadcastBufferMemory == that.broadcastBufferMemory;
}
@Override
public int hashCode()
{
return Objects.hash(
- processorBundleMemory,
- superSorterMaxActiveProcessors,
- superSorterMaxChannelsPerProcessor,
- partitionStatisticsMaxRetainedBytes
+ bundleFreeMemory,
+ frameSize,
+ superSorterConcurrentProcessors,
+ superSorterMaxChannelsPerMerger,
+ partitionStatisticsMaxRetainedBytes,
+ broadcastBufferMemory
);
}
@@ -440,206 +379,205 @@ public int hashCode()
public String toString()
{
return "WorkerMemoryParameters{" +
- "processorBundleMemory=" + processorBundleMemory +
- ", superSorterMaxActiveProcessors=" + superSorterMaxActiveProcessors +
- ", superSorterMaxChannelsPerProcessor=" + superSorterMaxChannelsPerProcessor +
+ "bundleFreeMemory=" + bundleFreeMemory +
+ ", frameSize=" + frameSize +
+ ", superSorterConcurrentProcessors=" + superSorterConcurrentProcessors +
+ ", superSorterMaxChannelsPerMerger=" + superSorterMaxChannelsPerMerger +
", partitionStatisticsMaxRetainedBytes=" + partitionStatisticsMaxRetainedBytes +
+ ", broadcastBufferMemory=" + broadcastBufferMemory +
'}';
}
/**
- * Computes the highest value of numInputWorkers, for the given parameters, that can be passed to
- * {@link #createInstance} without resulting in a {@link TooManyWorkersFault}.
- *
- * Returns 0 if no number of workers would be OK.
+ * Compute the memory allocated to each {@link WorkOrder} within a {@link Worker}.
*/
- static int computeMaxWorkers(
- final long usableMemoryInJvm,
- final int numWorkersInJvm,
- final int numProcessingThreadsInJvm,
- final int maxConcurrentStages,
- final int numHashOutputPartitions
- )
+ static long computeBundleMemory(final long memoryPerWorker, final int maxConcurrentStages)
{
- final long bundleMemory = memoryPerBundle(usableMemoryInJvm, numWorkersInJvm, numProcessingThreadsInJvm);
-
- // Compute number of workers that gives us PROCESSING_MINIMUM_BYTES of memory per bundle per concurrent stage, while
- // accounting for memoryNeededForInputChannels + memoryNeededForHashPartitioning.
- final int isHashing = numHashOutputPartitions > 0 ? 1 : 0;
- final long bundleMemoryPerStage = bundleMemory / maxConcurrentStages;
- final long maxWorkers =
- (bundleMemoryPerStage - PROCESSING_MINIMUM_BYTES) / ((long) STANDARD_FRAME_SIZE * (1 + isHashing)) - 1;
- return Math.max(0, Ints.checkedCast(maxWorkers));
+ return memoryPerWorker / maxConcurrentStages;
}
/**
- * Computes the amount of memory needed to read a single partition from a given number of workers.
+ * Compute the memory allocated to {@link KeyStatisticsCollectionProcessor} within each bundle.
*/
- static long memoryNeededForInputChannels(final int numInputWorkers)
+ static int computePartitionStatsMemory(final long bundleFreeMemory)
{
- // Workers that read sorted inputs must open all channels at once to do an N-way merge. Calculate memory needs.
- // Requirement: one input frame per worker, one buffered output frame.
- return (long) STANDARD_FRAME_SIZE * (numInputWorkers + 1);
+ return Ints.checkedCast(
+ Math.max(
+ (long) Math.min(
+ bundleFreeMemory * PARTITION_STATS_MAX_BUNDLE_FREE_MEMORY_FRACTION,
+ PARTITION_STATS_MAX_MEMORY_PER_BUNDLE
+ ),
+ PARTITION_STATS_MIN_MEMORY_PER_BUNDLE
+ )
+ );
}
/**
- * Maximum number of workers that may exist in the current JVM.
+ * Compute the memory limit passed to {@link BroadcastJoinSegmentMapFnProcessor} within each worker bundle. This
+ * is somewhat lower than {@link #computeBroadcastBufferMemoryIncludingOverhead}, because we expect some overhead on
+ * top of this limit due to indexing structures. This overhead isn't accounted for by the processor
+ * {@link BroadcastJoinSegmentMapFnProcessor} itself.
*/
- private static int computeNumWorkersInJvm(final Injector injector)
+ static long computeBroadcastBufferMemory(final long bundleMemory)
{
- final AppenderatorsManager appenderatorsManager = injector.getInstance(AppenderatorsManager.class);
+ return (long) (bundleMemory * BROADCAST_BUFFER_TOTAL_MEMORY_FRACTION);
+ }
- if (appenderatorsManager instanceof UnifiedIndexerAppenderatorsManager) {
- // CliIndexer
- return injector.getInstance(WorkerConfig.class).getCapacity();
- } else {
- // CliPeon
- return 1;
- }
+ /**
+ * Memory allocated to {@link BroadcastJoinSegmentMapFnProcessor} within each worker bundle, including
+ * expected overhead.
+ */
+ static long computeBroadcastBufferMemoryIncludingOverhead(final long bundleMemory)
+ {
+ return (long) (computeBroadcastBufferMemory(bundleMemory) * BROADCAST_BUFFER_OVERHEAD_RATIO);
}
/**
- * Maximum number of concurrent processors that exist in the current JVM.
+ * Memory allocated to each processor within a bundle, including fixed overheads and buffered input and output frames.
+ *
+ * @param maxSimultaneousInputChannelsPerProcessor figure from {@link #computeMaxSimultaneousInputChannelsPerProcessor}
+ * @param frameSize frame size
*/
- private static int computeNumProcessorsInJvm(final Injector injector)
+ static long computeProcessorMemory(final int maxSimultaneousInputChannelsPerProcessor, final int frameSize)
{
- return injector.getInstance(Bouncer.class).getMaxCount();
+ return EXTRA_MEMORY_PER_PROCESSOR
+ + computeProcessorMemoryForInputChannels(maxSimultaneousInputChannelsPerProcessor, frameSize)
+ + frameSize /* output frame */;
}
/**
- * Compute the memory allocated to each worker. Includes anything that exists outside of processing bundles.
+ * Memory allocated to each processor for reading its inputs.
*
- * Today, we only look at one thing: the amount of memory taken up by
- * {@link org.apache.druid.msq.statistics.ClusterByStatisticsCollector}. This is the single largest source of memory
- * usage outside processing bundles.
+ * @param maxSimultaneousInputChannelsPerProcessor figure from {@link #computeMaxSimultaneousInputChannelsPerProcessor}
+ * @param frameSize frame size
*/
- private static long memoryPerWorker(
- final long usableMemoryInJvm,
- final int numWorkersInJvm
+ static long computeProcessorMemoryForInputChannels(
+ final int maxSimultaneousInputChannelsPerProcessor,
+ final int frameSize
)
{
- final long memoryForWorkers = (long) Math.min(
- usableMemoryInJvm * PARTITION_STATS_MEMORY_MAX_FRACTION,
- numWorkersInJvm * PARTITION_STATS_MEMORY_MAX_BYTES
- );
-
- return memoryForWorkers / numWorkersInJvm;
+ return (long) maxSimultaneousInputChannelsPerProcessor * frameSize;
}
/**
- * Compute the memory allocated to each processing bundle. Any computation changes done to this method should also be
- * done in its corresponding method {@link WorkerMemoryParameters#estimateUsableMemory}
+ * Number of input partitions across all {@link StageInputSlice}.
*/
- private static long memoryPerBundle(
- final long usableMemoryInJvm,
- final int numWorkersInJvm,
- final int numProcessingThreadsInJvm
- )
+ static int computeNumInputPartitions(final List inputSlices)
{
- // One bundle per worker + one per processor. The worker bundles are used for sorting (SuperSorter) and the
- // processing bundles are used for reading input and doing per-partition processing.
- final int bundleCount = numWorkersInJvm + numProcessingThreadsInJvm;
+ int retVal = 0;
- // Need to subtract memoryForWorkers off the top of usableMemoryInJvm, since this is reserved for
- // statistics collection.
- final long memoryForWorkers = numWorkersInJvm * memoryPerWorker(usableMemoryInJvm, numWorkersInJvm);
- final long memoryForBundles = usableMemoryInJvm - memoryForWorkers;
+ for (final StageInputSlice slice : InputSlices.allStageSlices(inputSlices)) {
+ retVal += Iterables.size(slice.getPartitions());
+ }
- // Divide up the usable memory per bundle.
- return memoryForBundles / bundleCount;
+ return retVal;
}
/**
- * Used for estimating the usable memory for better exception messages when {@link NotEnoughMemoryFault} is thrown.
+ * Maximum number of input channels that a processor may have open at once, given the provided worker assignment.
+ *
+ * To compute this, we take the maximum number of workers associated with some partition for each slice. Then we sum
+ * those maxes up for all broadcast slices, and for all non-broadcast slices, and take the max between those two.
+ * The idea is that processors first read broadcast data, then read non-broadcast data, and during both phases
+ * they should have at most one partition open from each slice at once.
+ *
+ * @param inputSlices object from {@link WorkOrder#getInputs()}
+ * @param broadcastInputNumbers object from {@link StageDefinition#getBroadcastInputNumbers()}
*/
- private static long estimateUsableMemory(
- final int numWorkersInJvm,
- final int numProcessingThreadsInJvm,
- final long estimatedEachBundleMemory,
- final int maxConcurrentStages
+ static int computeMaxSimultaneousInputChannelsPerProcessor(
+ final List inputSlices,
+ final IntSet broadcastInputNumbers
)
{
- final int bundleCount = numWorkersInJvm + numProcessingThreadsInJvm;
- return estimateUsableMemory(numWorkersInJvm, estimatedEachBundleMemory * bundleCount, maxConcurrentStages);
+ long totalNonBroadcastInputChannels = 0;
+ long totalBroadcastInputChannels = 0;
+
+ final List allStageSlices = InputSlices.allStageSlices(inputSlices);
+
+ for (int inputNumber = 0; inputNumber < allStageSlices.size(); inputNumber++) {
+ final StageInputSlice slice = allStageSlices.get(inputNumber);
+
+ int maxWorkers = 0;
+ for (final ReadablePartition partition : slice.getPartitions()) {
+ maxWorkers = Math.max(maxWorkers, partition.getWorkerNumbers().size());
+ }
+
+ if (broadcastInputNumbers.contains(inputNumber)) {
+ totalBroadcastInputChannels += maxWorkers;
+ } else {
+ totalNonBroadcastInputChannels += maxWorkers;
+ }
+ }
+
+ return Ints.checkedCast(Math.max(totalBroadcastInputChannels, totalNonBroadcastInputChannels));
}
+
/**
- * Add overheads to the estimated bundle memoery for all the workers. Checkout {@link WorkerMemoryParameters#memoryPerWorker(long, int)}
- * for the overhead calculation outside the processing bundles.
+ * Distinct number of input workers.
*/
- private static long estimateUsableMemory(
- final int numWorkersInJvm,
- final long estimatedTotalBundleMemory,
- final int maxConcurrentStages
- )
+ static int computeNumInputWorkers(final List inputSlices)
{
- // Currently, we only add the partition stats overhead since it will be the single largest overhead per worker.
- final long estimateStatOverHeadPerWorker = PARTITION_STATS_MEMORY_MAX_BYTES;
- final long requiredUsableMemory = estimatedTotalBundleMemory + (estimateStatOverHeadPerWorker * numWorkersInJvm);
- return requiredUsableMemory * maxConcurrentStages;
- }
+ final IntSet workerNumbers = new IntOpenHashSet();
- private static long memoryNeededForHashPartitioning(final int numOutputPartitions)
- {
- // One standard frame for each processor output.
- // May be zero, since numOutputPartitions is zero if not using hash partitioning.
- return (long) STANDARD_FRAME_SIZE * numOutputPartitions;
+ for (final StageInputSlice slice : InputSlices.allStageSlices(inputSlices)) {
+ for (final ReadablePartition partition : slice.getPartitions()) {
+ workerNumbers.addAll(partition.getWorkerNumbers());
+ }
+ }
+
+ return workerNumbers.size();
}
/**
- * Amount of heap memory available for our usage. Any computation changes done to this method should also be done in
- * its corresponding method {@link WorkerMemoryParameters#calculateSuggestedMinMemoryFromUsableMemory}
+ * Maximum number of output channels for a shuffle spec, or 0 if not knowable in advance.
*/
- private static long computeUsableMemoryInJvm(final long maxMemory, final long totalLookupFootprint)
+ static int computeMaxOutputPartitions(@Nullable final ShuffleSpec shuffleSpec)
{
- // Always report at least one byte, to simplify the math in createInstance.
- return Math.max(
- 1,
- (long) ((maxMemory - totalLookupFootprint) * USABLE_MEMORY_FRACTION)
- );
+ if (shuffleSpec == null) {
+ return 0;
+ } else {
+ switch (shuffleSpec.kind()) {
+ case HASH:
+ case HASH_LOCAL_SORT:
+ case MIX:
+ return shuffleSpec.partitionCount();
+
+ case GLOBAL_SORT:
+ if (shuffleSpec instanceof GlobalSortMaxCountShuffleSpec) {
+ return ((GlobalSortMaxCountShuffleSpec) shuffleSpec).getMaxPartitions();
+ }
+ // Fall through
+
+ default:
+ return 0;
+ }
+ }
}
/**
- * Estimate amount of heap memory for the given workload to use in case usable memory is provided. This method is used
- * for better exception messages when {@link NotEnoughMemoryFault} is thrown.
+ * Maximum number of output channels for a shuffle spec, or 0 if not knowable in advance.
*/
- private static long calculateSuggestedMinMemoryFromUsableMemory(long usuableMemeory, final long totalLookupFootprint)
+ static int computeFramesPerOutputChannel(final OutputChannelMode outputChannelMode)
{
- return (long) ((usuableMemeory / USABLE_MEMORY_FRACTION) + totalLookupFootprint);
+ // If durable storage is enabled, we need one extra frame per output channel.
+ return outputChannelMode.isDurable() ? 2 : 1;
}
/**
- * Total estimated lookup footprint. Obtained by calling {@link LookupExtractor#estimateHeapFootprint()} on
- * all available lookups.
+ * Minimum number of bytes for a bundle's free memory allotment. This must be enough to reasonably produce and
+ * persist an {@link IncrementalIndex}, or to run a {@link SuperSorter} with 1 thread and 2 frames.
*/
- private static long computeTotalLookupFootprint(final Injector injector)
+ static long computeMinimumBundleFreeMemory(final int frameSize, final int numFramesPerOutputChannel)
{
- // Subtract memory taken up by lookups. Correctness of this operation depends on lookups being loaded *before*
- // we create this instance. Luckily, this is the typical mode of operation, since by default
- // druid.lookup.enableLookupSyncOnStartup = true.
- final LookupExtractorFactoryContainerProvider lookupManager =
- injector.getInstance(LookupExtractorFactoryContainerProvider.class);
-
- int lookupCount = 0;
- long lookupFootprint = 0;
-
- for (final String lookupName : lookupManager.getAllLookupNames()) {
- final LookupExtractorFactoryContainer container = lookupManager.get(lookupName).orElse(null);
-
- if (container != null) {
- try {
- final LookupExtractor extractor = container.getLookupExtractorFactory().get();
- lookupFootprint += extractor.estimateHeapFootprint();
- lookupCount++;
- }
- catch (Exception e) {
- log.noStackTrace().warn(e, "Failed to load lookup [%s] for size estimation. Skipping.", lookupName);
- }
- }
- }
+ // Some for partition statistics.
+ long minMemory = PARTITION_STATS_MIN_MEMORY_PER_BUNDLE;
- log.debug("Lookup footprint: %d lookups with %,d total bytes.", lookupCount, lookupFootprint);
+ // Some for a minimally-sized super-sorter.
+ minMemory += (long) (2 + numFramesPerOutputChannel) * frameSize;
- return lookupFootprint;
+ // That's enough. Don't consider the possibility that the bundle may be used for producing IncrementalIndex,
+ // because PARTITION_STATS_MIN_MEMORY_PER_BUNDLE more or less covers that.
+ return minMemory;
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/IndexerMemoryManagementModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/IndexerMemoryManagementModule.java
index 92f16a631d9f..61f03e40ab6f 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/IndexerMemoryManagementModule.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/IndexerMemoryManagementModule.java
@@ -22,13 +22,15 @@
import com.google.inject.Binder;
import com.google.inject.Provides;
import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.exec.MemoryIntrospectorImpl;
+import org.apache.druid.msq.exec.ProcessingBuffersProvider;
+import org.apache.druid.msq.indexing.IndexerProcessingBuffersProvider;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.utils.JvmUtils;
@@ -42,37 +44,51 @@
public class IndexerMemoryManagementModule implements DruidModule
{
/**
- * Allocate up to 75% of memory for MSQ-related stuff (if all running tasks are MSQ tasks).
+ * Allocate up to 60% of memory for the MSQ framework (if all running tasks are MSQ tasks). This does not include the
+ * memory allocated to {@link #PROCESSING_MEMORY_FRACTION}.
*/
- private static final double USABLE_MEMORY_FRACTION = 0.75;
+ private static final double MSQ_MEMORY_FRACTION = 0.60;
+
+ /**
+ * Allocate up to 15% of memory for processing buffers for MSQ tasks.
+ */
+ private static final double PROCESSING_MEMORY_FRACTION = 0.15;
@Override
public void configure(Binder binder)
{
- // Nothing to do.
+ TaskMemoryManagementConfig.bind(binder);
}
@Provides
- @LazySingleton
- public Bouncer makeProcessorBouncer(final DruidProcessingConfig processingConfig)
- {
- return new Bouncer(processingConfig.getNumThreads());
- }
-
- @Provides
- @LazySingleton
+ @ManageLifecycle
public MemoryIntrospector createMemoryIntrospector(
final LookupExtractorFactoryContainerProvider lookupProvider,
+ final TaskMemoryManagementConfig taskMemoryManagementConfig,
final DruidProcessingConfig processingConfig,
final WorkerConfig workerConfig
)
{
return new MemoryIntrospectorImpl(
- lookupProvider,
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
- USABLE_MEMORY_FRACTION,
+ MSQ_MEMORY_FRACTION,
+ workerConfig.getCapacity(),
+ PeonMemoryManagementModule.getNumThreads(taskMemoryManagementConfig, processingConfig),
+ lookupProvider
+ );
+ }
+
+ @Provides
+ @LazySingleton
+ public ProcessingBuffersProvider createProcessingBuffersProvider(
+ final MemoryIntrospector memoryIntrospector,
+ final WorkerConfig workerConfig
+ )
+ {
+ return new IndexerProcessingBuffersProvider(
+ (long) (JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes() * PROCESSING_MEMORY_FRACTION),
workerConfig.getCapacity(),
- processingConfig.getNumThreads()
+ memoryIntrospector.numProcessingThreads()
);
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/PeonMemoryManagementModule.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/PeonMemoryManagementModule.java
index 9e814c082781..39265434584c 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/PeonMemoryManagementModule.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/PeonMemoryManagementModule.java
@@ -21,22 +21,30 @@
import com.google.inject.Binder;
import com.google.inject.Provides;
+import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.discovery.NodeRole;
-import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.IAE;
import org.apache.druid.msq.exec.MemoryIntrospector;
import org.apache.druid.msq.exec.MemoryIntrospectorImpl;
+import org.apache.druid.msq.exec.ProcessingBuffersProvider;
+import org.apache.druid.msq.indexing.PeonProcessingBuffersProvider;
+import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.utils.JvmUtils;
+import java.nio.ByteBuffer;
+
/**
* Provides {@link MemoryIntrospector} for single-task-per-JVM model.
*
* @see IndexerMemoryManagementModule for multi-task-per-JVM model used on {@link org.apache.druid.cli.CliIndexer}
*/
@LoadScope(roles = NodeRole.PEON_JSON_NAME)
+
public class PeonMemoryManagementModule implements DruidModule
{
/**
@@ -45,41 +53,61 @@ public class PeonMemoryManagementModule implements DruidModule
private static final int NUM_WORKERS_IN_JVM = 1;
/**
- * Peons may have more than one processing thread, but we currently only use one of them.
- */
- private static final int NUM_PROCESSING_THREADS = 1;
-
- /**
- * Allocate 75% of memory for MSQ-related stuff.
+ * Allocate 75% of memory for the MSQ framework.
*/
private static final double USABLE_MEMORY_FRACTION = 0.75;
@Override
public void configure(Binder binder)
{
- // Nothing to do.
- }
-
- @Provides
- @LazySingleton
- public Bouncer makeProcessorBouncer()
- {
- return new Bouncer(NUM_PROCESSING_THREADS);
+ TaskMemoryManagementConfig.bind(binder);
}
@Provides
@LazySingleton
public MemoryIntrospector createMemoryIntrospector(
final LookupExtractorFactoryContainerProvider lookupProvider,
- final Bouncer bouncer
+ final DruidProcessingConfig processingConfig,
+ final TaskMemoryManagementConfig taskMemoryManagementConfig
)
{
return new MemoryIntrospectorImpl(
- lookupProvider,
JvmUtils.getRuntimeInfo().getMaxHeapSizeBytes(),
USABLE_MEMORY_FRACTION,
NUM_WORKERS_IN_JVM,
- bouncer.getMaxCount()
+ getNumThreads(taskMemoryManagementConfig, processingConfig),
+ lookupProvider
);
}
+
+ @Provides
+ @LazySingleton
+ public ProcessingBuffersProvider createProcessingBuffersProvider(
+ @Global final NonBlockingPool processingPool,
+ final MemoryIntrospector memoryIntrospector
+ )
+ {
+ return new PeonProcessingBuffersProvider(
+ processingPool,
+ memoryIntrospector.numProcessingThreads()
+ );
+ }
+
+ public static int getNumThreads(
+ final TaskMemoryManagementConfig taskMemoryManagementConfig,
+ final DruidProcessingConfig processingConfig
+ )
+ {
+ if (taskMemoryManagementConfig.getMaxThreads() == TaskMemoryManagementConfig.UNLIMITED) {
+ return processingConfig.getNumThreads();
+ } else if (taskMemoryManagementConfig.getMaxThreads() > 0) {
+ return Math.min(taskMemoryManagementConfig.getMaxThreads(), processingConfig.getNumThreads());
+ } else {
+ throw new IAE(
+ "Invalid value of %s.maxThreads[%d]",
+ TaskMemoryManagementConfig.BASE_PROPERTY,
+ taskMemoryManagementConfig.getMaxThreads()
+ );
+ }
+ }
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/TaskMemoryManagementConfig.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/TaskMemoryManagementConfig.java
new file mode 100644
index 000000000000..d8dc278aa167
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/guice/TaskMemoryManagementConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.guice;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.inject.Binder;
+import org.apache.druid.guice.JsonConfigProvider;
+import org.apache.druid.java.util.common.StringUtils;
+
+/**
+ * Server configuration for {@link PeonMemoryManagementModule} and {@link IndexerMemoryManagementModule}.
+ */
+public class TaskMemoryManagementConfig
+{
+ public static final String BASE_PROPERTY = StringUtils.format("%s.task.memory", MSQIndexingModule.BASE_MSQ_KEY);
+ public static final int UNLIMITED = -1;
+
+ @JsonProperty("maxThreads")
+ private int maxThreads = 1;
+
+ public static void bind(final Binder binder)
+ {
+ JsonConfigProvider.bind(
+ binder,
+ BASE_PROPERTY,
+ TaskMemoryManagementConfig.class
+ );
+ }
+
+ public int getMaxThreads()
+ {
+ return maxThreads;
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
index fb6e4a0079f1..e8f3739facb4 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerFrameContext.java
@@ -20,9 +20,11 @@
package org.apache.druid.msq.indexing;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.frame.processor.Bouncer;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
+import org.apache.druid.msq.exec.ProcessingBuffers;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.kernel.FrameContext;
@@ -35,6 +37,7 @@
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.loading.DataSegmentPusher;
+import javax.annotation.Nullable;
import java.io.File;
public class IndexerFrameContext implements FrameContext
@@ -43,6 +46,8 @@ public class IndexerFrameContext implements FrameContext
private final IndexerWorkerContext context;
private final IndexIO indexIO;
private final DataSegmentProvider dataSegmentProvider;
+ @Nullable
+ private final ResourceHolder processingBuffers;
private final WorkerMemoryParameters memoryParameters;
private final WorkerStorageParameters storageParameters;
private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
@@ -52,6 +57,7 @@ public IndexerFrameContext(
IndexerWorkerContext context,
IndexIO indexIO,
DataSegmentProvider dataSegmentProvider,
+ @Nullable ResourceHolder processingBuffers,
DataServerQueryHandlerFactory dataServerQueryHandlerFactory,
WorkerMemoryParameters memoryParameters,
WorkerStorageParameters storageParameters
@@ -61,6 +67,7 @@ public IndexerFrameContext(
this.context = context;
this.indexIO = indexIO;
this.dataSegmentProvider = dataSegmentProvider;
+ this.processingBuffers = processingBuffers;
this.memoryParameters = memoryParameters;
this.storageParameters = storageParameters;
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
@@ -135,15 +142,19 @@ public IndexMergerV9 indexMerger()
}
@Override
- public WorkerMemoryParameters memoryParameters()
+ public ProcessingBuffers processingBuffers()
{
- return memoryParameters;
+ if (processingBuffers != null) {
+ return processingBuffers.get();
+ } else {
+ throw new ISE("No processing buffers");
+ }
}
@Override
- public Bouncer processorBouncer()
+ public WorkerMemoryParameters memoryParameters()
{
- return context.injector().getInstance(Bouncer.class);
+ return memoryParameters;
}
@Override
@@ -155,6 +166,8 @@ public WorkerStorageParameters storageParameters()
@Override
public void close()
{
- // Nothing to close.
+ if (processingBuffers != null) {
+ processingBuffers.close();
+ }
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerProcessingBuffersProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerProcessingBuffersProvider.java
new file mode 100644
index 000000000000..dcf499c3f2f9
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerProcessingBuffersProvider.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import org.apache.druid.cli.CliIndexer;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.msq.exec.ProcessingBuffersProvider;
+import org.apache.druid.msq.exec.ProcessingBuffersSet;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Production implementation of {@link ProcessingBuffersProvider} for tasks in {@link CliIndexer}.
+ */
+public class IndexerProcessingBuffersProvider implements ProcessingBuffersProvider
+{
+ private static final int MIN_BUFFER_SIZE = 1_000_000;
+
+ private final long heapMemoryToUse;
+ private final int taskCapacity;
+ private final int numThreads;
+
+ public IndexerProcessingBuffersProvider(final long heapMemoryToUse, final int taskCapacity, final int numThreads)
+ {
+ this.heapMemoryToUse = heapMemoryToUse;
+ this.taskCapacity = taskCapacity;
+ this.numThreads = numThreads;
+ }
+
+ @Override
+ public ResourceHolder acquire(int poolSize)
+ {
+ if (poolSize == 0) {
+ return new ReferenceCountingResourceHolder<>(ProcessingBuffersSet.EMPTY, () -> {});
+ }
+
+ final long heapMemoryPerWorker = heapMemoryToUse / taskCapacity;
+ final int numThreadsPerWorker = (int) Math.min(
+ numThreads,
+ heapMemoryPerWorker / MIN_BUFFER_SIZE
+ );
+
+ if (numThreadsPerWorker < 1) {
+ // Should not happen unless the CliIndexer has an unreasonable configuration.
+ // CliIndexer typically has well in excess of 1 MB (min buffer size) of heap per task.
+ throw new ISE("Cannot acquire buffers, available heap memory is not enough for task capacity[%d]", taskCapacity);
+ }
+
+ // bufferPools has one list per "poolSize"; each of those lists has "bufferCount" buffers of size "sliceSize".
+ final List> bufferPools = new ArrayList<>(poolSize);
+ final int sliceSize = (int) Math.min(Integer.MAX_VALUE, heapMemoryPerWorker / numThreadsPerWorker);
+
+ for (int i = 0; i < poolSize; i++) {
+ final List bufferPool = new ArrayList<>(numThreadsPerWorker);
+ bufferPools.add(bufferPool);
+
+ for (int j = 0; j < numThreadsPerWorker; j++) {
+ bufferPool.add(ByteBuffer.allocate(sliceSize));
+ }
+ }
+
+ // bufferPools is built, return it as a ProcessingBuffersSet.
+ return new ReferenceCountingResourceHolder<>(
+ ProcessingBuffersSet.fromCollection(bufferPools),
+ () -> {} // Garbage collection will reclaim the buffers, since they are on-heap
+ );
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
index 0b3063ef48ba..595e62d9a7fc 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerWorkerContext.java
@@ -24,6 +24,7 @@
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Injector;
import com.google.inject.Key;
+import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
@@ -34,7 +35,8 @@
import org.apache.druid.msq.exec.ControllerClient;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
import org.apache.druid.msq.exec.MemoryIntrospector;
-import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.exec.ProcessingBuffersProvider;
+import org.apache.druid.msq.exec.ProcessingBuffersSet;
import org.apache.druid.msq.exec.TaskDataSegmentProvider;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerClient;
@@ -45,7 +47,7 @@
import org.apache.druid.msq.indexing.client.IndexerWorkerClient;
import org.apache.druid.msq.indexing.client.WorkerChatHandler;
import org.apache.druid.msq.kernel.FrameContext;
-import org.apache.druid.msq.kernel.QueryDefinition;
+import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryToolChestWarehouse;
@@ -79,12 +81,16 @@ public class IndexerWorkerContext implements WorkerContext
private final DataServerQueryHandlerFactory dataServerQueryHandlerFactory;
private final ServiceClientFactory clientFactory;
private final MemoryIntrospector memoryIntrospector;
+ private final ProcessingBuffersProvider processingBuffersProvider;
private final int maxConcurrentStages;
private final boolean includeAllCounters;
@GuardedBy("this")
private ServiceLocator controllerLocator;
+ // Written under synchronized(this) using double-checked locking.
+ private volatile ResourceHolder processingBuffersSet;
+
public IndexerWorkerContext(
final MSQWorkerTask task,
final TaskToolbox toolbox,
@@ -94,6 +100,7 @@ public IndexerWorkerContext(
final TaskDataSegmentProvider dataSegmentProvider,
final ServiceClientFactory clientFactory,
final MemoryIntrospector memoryIntrospector,
+ final ProcessingBuffersProvider processingBuffersProvider,
final DataServerQueryHandlerFactory dataServerQueryHandlerFactory
)
{
@@ -105,6 +112,7 @@ public IndexerWorkerContext(
this.dataSegmentProvider = dataSegmentProvider;
this.clientFactory = clientFactory;
this.memoryIntrospector = memoryIntrospector;
+ this.processingBuffersProvider = processingBuffersProvider;
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
final QueryContext queryContext = QueryContext.of(task.getContext());
@@ -127,6 +135,7 @@ public static IndexerWorkerContext createProductionInstance(
final MemoryIntrospector memoryIntrospector = injector.getInstance(MemoryIntrospector.class);
final OverlordClient overlordClient =
injector.getInstance(OverlordClient.class).withRetryPolicy(StandardRetryPolicy.unlimited());
+ final ProcessingBuffersProvider processingBuffersProvider = injector.getInstance(ProcessingBuffersProvider.class);
final ObjectMapper smileMapper = injector.getInstance(Key.get(ObjectMapper.class, Smile.class));
final QueryToolChestWarehouse warehouse = injector.getInstance(QueryToolChestWarehouse.class);
@@ -139,6 +148,7 @@ public static IndexerWorkerContext createProductionInstance(
new TaskDataSegmentProvider(toolbox.getCoordinatorClient(), segmentCacheManager, indexIO),
serviceClientFactory,
memoryIntrospector,
+ processingBuffersProvider,
new DataServerQueryHandlerFactory(
toolbox.getCoordinatorClient(),
serviceClientFactory,
@@ -191,6 +201,14 @@ public void registerWorker(Worker worker, Closer closer)
}
}
});
+ closer.register(() -> {
+ synchronized (this) {
+ if (processingBuffersSet != null) {
+ processingBuffersSet.close();
+ processingBuffersSet = null;
+ }
+ }
+ });
// Register the periodic controller checker
final ExecutorService periodicControllerCheckerExec = Execs.singleThreaded("controller-status-checker-%s");
@@ -281,23 +299,39 @@ public WorkerClient makeWorkerClient()
}
@Override
- public FrameContext frameContext(QueryDefinition queryDef, int stageNumber, OutputChannelMode outputChannelMode)
+ public FrameContext frameContext(WorkOrder workOrder)
{
+ if (processingBuffersSet == null) {
+ synchronized (this) {
+ if (processingBuffersSet == null) {
+ processingBuffersSet = processingBuffersProvider.acquire(
+ workOrder.getQueryDefinition(),
+ maxConcurrentStages()
+ );
+ }
+ }
+ }
+
+ final WorkerMemoryParameters memoryParameters =
+ WorkerMemoryParameters.createProductionInstance(workOrder, memoryIntrospector, maxConcurrentStages);
+ log.info("Memory parameters for stage[%s]: %s", workOrder.getStageDefinition().getId(), memoryParameters);
+
return new IndexerFrameContext(
- queryDef.getStageDefinition(stageNumber).getId(),
+ workOrder.getStageDefinition().getId(),
this,
indexIO,
dataSegmentProvider,
+ processingBuffersSet.get().acquireForStage(workOrder.getStageDefinition()),
dataServerQueryHandlerFactory,
- WorkerMemoryParameters.createProductionInstanceForWorker(injector, queryDef, stageNumber, maxConcurrentStages),
- WorkerStorageParameters.createProductionInstance(injector, outputChannelMode)
+ memoryParameters,
+ WorkerStorageParameters.createProductionInstance(injector, workOrder.getOutputChannelMode())
);
}
@Override
public int threadCount()
{
- return memoryIntrospector.numProcessorsInJvm();
+ return memoryIntrospector.numProcessingThreads();
}
@Override
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/PeonProcessingBuffersProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/PeonProcessingBuffersProvider.java
new file mode 100644
index 000000000000..264c7af112fc
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/PeonProcessingBuffersProvider.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+import org.apache.druid.cli.CliPeon;
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.collections.ReferenceCountingResourceHolder;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.msq.exec.ProcessingBuffersProvider;
+import org.apache.druid.msq.exec.ProcessingBuffersSet;
+import org.apache.druid.utils.CloseableUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Production implementation of {@link ProcessingBuffersProvider} for tasks in {@link CliPeon}.
+ */
+public class PeonProcessingBuffersProvider implements ProcessingBuffersProvider
+{
+ private final AtomicBoolean acquired = new AtomicBoolean(false);
+ private final NonBlockingPool bufferPool;
+ private final int bufferCount;
+
+ public PeonProcessingBuffersProvider(
+ final NonBlockingPool bufferPool,
+ final int bufferCount
+ )
+ {
+ this.bufferPool = bufferPool;
+ this.bufferCount = bufferCount;
+ }
+
+ @Override
+ public ResourceHolder acquire(int poolSize)
+ {
+ if (poolSize == 0) {
+ return new ReferenceCountingResourceHolder<>(ProcessingBuffersSet.EMPTY, () -> {});
+ }
+
+ if (!acquired.compareAndSet(false, true)) {
+ // We expect a single task in the JVM for CliPeon.
+ throw DruidException.defensive("Expected a single call to acquire() for[%s]", getClass().getName());
+ }
+
+ final Closer closer = Closer.create();
+
+ try {
+ // bufferPools has one list per "poolSize"; each of those lists has "bufferCount" buffers.
+ // Build these by acquiring "bufferCount" processing buffers and slicing each one up into "poolSize" slices.
+ final List> bufferPools = new ArrayList<>();
+ for (int i = 0; i < poolSize; i++) {
+ bufferPools.add(new ArrayList<>(bufferCount));
+ }
+
+ for (int i = 0; i < bufferCount; i++) {
+ final ResourceHolder bufferHolder = closer.register(bufferPool.take());
+ final ByteBuffer buffer = bufferHolder.get().duplicate();
+ final int sliceSize = buffer.capacity() / poolSize;
+
+ for (int j = 0; j < poolSize; j++) {
+ buffer.position(sliceSize * j).limit(sliceSize * (j + 1));
+ bufferPools.get(j).add(buffer.slice());
+ }
+ }
+
+ // bufferPools is built, return it as a ProcessingBuffersSet.
+ return new ReferenceCountingResourceHolder<>(
+ ProcessingBuffersSet.fromCollection(bufferPools),
+ closer
+ );
+ }
+ catch (Throwable e) {
+ throw CloseableUtils.closeAndWrapInCatch(e, closer);
+ }
+ }
+}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
index 6f4b36da1eec..d4360a09d1a8 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/NotEnoughMemoryFault.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.java.util.common.StringUtils;
import java.util.Objects;
@@ -36,6 +37,7 @@ public class NotEnoughMemoryFault extends BaseMSQFault
private final long usableMemory;
private final int serverWorkers;
private final int serverThreads;
+ private final int inputWorkers;
private final int maxConcurrentStages;
@JsonCreator
@@ -45,22 +47,33 @@ public NotEnoughMemoryFault(
@JsonProperty("usableMemory") final long usableMemory,
@JsonProperty("serverWorkers") final int serverWorkers,
@JsonProperty("serverThreads") final int serverThreads,
+ @JsonProperty("inputWorkers") final int inputWorkers,
@JsonProperty("maxConcurrentStages") final int maxConcurrentStages
)
{
super(
CODE,
- "Not enough memory. Required at least %,d bytes. (total = %,d bytes; usable = %,d bytes; "
- + "worker capacity = %,d; processing threads = %,d; concurrent stages = %,d). "
+ "Not enough memory. "
+ + (suggestedServerMemory > 0
+ ? StringUtils.format("Minimum bytes[%,d] is needed for the current configuration. ", suggestedServerMemory)
+ : "")
+ + "(total bytes[%,d]; "
+ + "usable bytes[%,d]; "
+ + "input workers[%,d]; "
+ + "concurrent stages[%,d]; "
+ + "server worker capacity[%,d]; "
+ + "server processing threads[%,d]). "
+ "Increase JVM memory with the -Xmx option"
+ + (inputWorkers > 1 ? ", or reduce maxNumTasks for this query" : "")
+ + (maxConcurrentStages > 1 ? ", or reduce maxConcurrentStages for this query" : "")
+ (serverWorkers > 1 ? ", or reduce worker capacity on this server" : "")
- + (maxConcurrentStages > 1 ? ", or reduce maxConcurrentStages for this query" : ""),
- suggestedServerMemory,
+ + (serverThreads > 1 ? ", or reduce processing threads on this server" : ""),
serverMemory,
usableMemory,
+ inputWorkers,
+ maxConcurrentStages,
serverWorkers,
- serverThreads,
- maxConcurrentStages
+ serverThreads
);
this.suggestedServerMemory = suggestedServerMemory;
@@ -68,10 +81,12 @@ public NotEnoughMemoryFault(
this.usableMemory = usableMemory;
this.serverWorkers = serverWorkers;
this.serverThreads = serverThreads;
+ this.inputWorkers = inputWorkers;
this.maxConcurrentStages = maxConcurrentStages;
}
@JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
public long getSuggestedServerMemory()
{
return suggestedServerMemory;
@@ -101,6 +116,13 @@ public int getServerThreads()
return serverThreads;
}
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+ public int getInputWorkers()
+ {
+ return inputWorkers;
+ }
+
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getMaxConcurrentStages()
@@ -126,6 +148,7 @@ public boolean equals(Object o)
&& usableMemory == that.usableMemory
&& serverWorkers == that.serverWorkers
&& serverThreads == that.serverThreads
+ && inputWorkers == that.inputWorkers
&& maxConcurrentStages == that.maxConcurrentStages;
}
@@ -139,6 +162,7 @@ public int hashCode()
usableMemory,
serverWorkers,
serverThreads,
+ inputWorkers,
maxConcurrentStages
);
}
@@ -148,10 +172,11 @@ public String toString()
{
return "NotEnoughMemoryFault{" +
"suggestedServerMemory=" + suggestedServerMemory +
- " bytes, serverMemory=" + serverMemory +
- " bytes, usableMemory=" + usableMemory +
- " bytes, serverWorkers=" + serverWorkers +
+ ", serverMemory=" + serverMemory +
+ ", usableMemory=" + usableMemory +
+ ", serverWorkers=" + serverWorkers +
", serverThreads=" + serverThreads +
+ ", inputWorkers=" + inputWorkers +
", maxConcurrentStages=" + maxConcurrentStages +
'}';
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsWithSameKeyFault.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsWithSameKeyFault.java
index 60d355579b6b..be284ae502d8 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsWithSameKeyFault.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsWithSameKeyFault.java
@@ -44,9 +44,8 @@ public TooManyRowsWithSameKeyFault(
{
super(
CODE,
- "Too many rows with the same key[%s] during sort-merge join (bytes buffered[%,d], limit[%,d]). "
- + "Try increasing heap memory available to workers, "
- + "or adjusting your query to process fewer rows with this key.",
+ "Too many rows with the same key[%s] on both sides of sort-merge join (bytes buffered[%,d], limit[%,d]). "
+ + "Try adjusting your query such that there are fewer rows with this key on at least one side of the join.",
key,
numBytes,
maxBytes
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java
index 16f9deff63d0..1796df89bf71 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/processor/SegmentGeneratorFrameProcessorFactory.java
@@ -28,6 +28,7 @@
import com.google.common.collect.Iterables;
import org.apache.druid.frame.processor.OutputChannelFactory;
import org.apache.druid.frame.processor.OutputChannels;
+import org.apache.druid.frame.processor.manager.ConcurrencyLimitedProcessorManager;
import org.apache.druid.frame.processor.manager.ProcessorManagers;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
@@ -210,21 +211,29 @@ public Pair apply(ReadableInput readableInput)
);
return new ProcessorsAndChannels<>(
- ProcessorManagers.of(workers)
- .withAccumulation(
- new HashSet<>(),
- (acc, segment) -> {
- if (segment != null) {
- acc.add(segment);
- }
-
- return acc;
- }
- ),
+ // Run at most one segmentGenerator per work order, since segment generation memory is carved out
+ // per-worker, not per-processor. See WorkerMemoryParameters for how the memory limits are calculated.
+ new ConcurrencyLimitedProcessorManager<>(ProcessorManagers.of(workers), 1)
+ .withAccumulation(
+ new HashSet<>(),
+ (acc, segment) -> {
+ if (segment != null) {
+ acc.add(segment);
+ }
+
+ return acc;
+ }
+ ),
OutputChannels.none()
);
}
+ @Override
+ public boolean usesProcessingBuffers()
+ {
+ return false;
+ }
+
@Override
public TypeReference> getResultTypeReference()
{
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecs.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecs.java
index 78241257710b..250f320118a8 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecs.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/InputSpecs.java
@@ -35,6 +35,9 @@ private InputSpecs()
// No instantiation.
}
+ /**
+ * Returns the set of input stages, from {@link StageInputSpec}, for a given list of {@link InputSpec}.
+ */
public static IntSet getStageNumbers(final List specs)
{
final IntSet retVal = new IntRBTreeSet();
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameContext.java
index da962a9d3931..1b80f72f86f5 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameContext.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameContext.java
@@ -20,9 +20,9 @@
package org.apache.druid.msq.kernel;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.msq.exec.DataServerQueryHandlerFactory;
-import org.apache.druid.msq.exec.OutputChannelMode;
+import org.apache.druid.msq.exec.ProcessingBuffers;
+import org.apache.druid.msq.exec.WorkerImpl;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.querykit.DataSegmentProvider;
@@ -40,7 +40,7 @@
* Provides services and objects for the functioning of the frame processors. Scoped to a specific stage of a
* specific query, i.e., one {@link WorkOrder}.
*
- * Generated by {@link org.apache.druid.msq.exec.WorkerContext#frameContext(QueryDefinition, int, OutputChannelMode)}.
+ * Generated by {@link org.apache.druid.msq.exec.WorkerContext#frameContext(WorkOrder)}.
*/
public interface FrameContext extends Closeable
{
@@ -54,6 +54,9 @@ public interface FrameContext extends Closeable
DataServerQueryHandlerFactory dataServerQueryHandlerFactory();
+ /**
+ * Temporary directory, fully owned by this particular stage.
+ */
File tempDir();
ObjectMapper jsonMapper();
@@ -66,7 +69,7 @@ public interface FrameContext extends Closeable
IndexMergerV9 indexMerger();
- Bouncer processorBouncer();
+ ProcessingBuffers processingBuffers();
WorkerMemoryParameters memoryParameters();
@@ -76,4 +79,11 @@ default File tempDir(String name)
{
return new File(tempDir(), name);
}
+
+ /**
+ * Releases resources used in processing. This is called when processing has completed, but before results are
+ * cleaned up. Specifically, it is called by {@link WorkerImpl.KernelHolder#processorCloser}.
+ */
+ @Override
+ void close();
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameProcessorFactory.java
index fbf02d46e346..1bdba5ee22e0 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameProcessorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameProcessorFactory.java
@@ -78,6 +78,11 @@ ProcessorsAndChannels makeProcessors(
boolean removeNullBytes
) throws IOException;
+ /**
+ * Whether processors from this factory use {@link org.apache.druid.msq.exec.ProcessingBuffers}.
+ */
+ boolean usesProcessingBuffers();
+
@Nullable
TypeReference getResultTypeReference();
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
index 19a7978abba8..cd2bb6a81f44 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java
@@ -146,6 +146,13 @@ public class StageDefinition
}
}
+ public static boolean mustGatherResultKeyStatistics(@Nullable final ShuffleSpec shuffleSpec)
+ {
+ return shuffleSpec != null
+ && shuffleSpec.kind() == ShuffleKind.GLOBAL_SORT
+ && ((GlobalSortShuffleSpec) shuffleSpec).mustGatherResultKeyStatistics();
+ }
+
public static StageDefinitionBuilder builder(final int stageNumber)
{
return new StageDefinitionBuilder(stageNumber);
@@ -302,14 +309,10 @@ public int getStageNumber()
* For eg: we know there's exactly one partition in query shapes like `select with limit`.
*
* In such cases, we return a false.
- *
- * @return
*/
public boolean mustGatherResultKeyStatistics()
{
- return shuffleSpec != null
- && shuffleSpec.kind() == ShuffleKind.GLOBAL_SORT
- && ((GlobalSortShuffleSpec) shuffleSpec).mustGatherResultKeyStatistics();
+ return mustGatherResultKeyStatistics(shuffleSpec);
}
public Either generatePartitionBoundariesForShuffle(
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
index 4e59e7d17a89..10543beeb069 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java
@@ -19,6 +19,8 @@
package org.apache.druid.msq.kernel.worker;
+import org.apache.druid.msq.exec.ProcessingBuffers;
+
/**
* Phases that a stage can be in, as far as the worker is concerned.
*
@@ -99,6 +101,8 @@ public boolean isTerminal()
/**
* Whether this phase indicates a stage is running and consuming its full complement of resources.
*
+ * Importantly, stages that are not running are not holding {@link ProcessingBuffers}.
+ *
* There are still some resources that can be consumed by stages that are not running. For example, in the
* {@link #FINISHED} state, stages can still have data on disk that has not been cleaned-up yet, some pointers
* to that data that still reside in memory, and some counters in memory available for collection by the controller.
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
index 4cf233876338..013b6d4c93c0 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java
@@ -352,7 +352,7 @@ private FrameProcessor> makeSegment
return BroadcastJoinSegmentMapFnProcessor.create(
query,
broadcastInputs,
- frameContext.memoryParameters().getBroadcastJoinMemory()
+ frameContext.memoryParameters().getBroadcastBufferMemory()
);
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
index ab160f7319da..cbb79c45702b 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BroadcastJoinSegmentMapFnProcessor.java
@@ -83,7 +83,7 @@ public class BroadcastJoinSegmentMapFnProcessor implements FrameProcessor query,
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java
index 9852f4f40988..6ad7742672f9 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java
@@ -174,6 +174,11 @@ public ProcessorsAndChannels