From 80ad8308807b047fa68b2cfec341da0f184f728c Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 26 Aug 2025 22:41:56 -0700 Subject: [PATCH] MSQ: Allow configurable maxFrameSize. In most situations, there is no need to change this parameter. Running into this limit is often a sign that something is wrong. However, with datasets that legitimately have very large rows, it does become necessary to use a larger frame size. --- docs/multi-stage-query/reference.md | 1 + .../dart/controller/DartControllerContext.java | 3 ++- .../msq/exec/ControllerMemoryParameters.java | 5 +++-- .../druid/msq/exec/WorkerMemoryParameters.java | 3 ++- .../msq/indexing/IndexerControllerContext.java | 3 ++- .../msq/indexing/error/RowTooLargeFault.java | 9 ++++++++- .../druid/msq/util/MultiStageQueryContext.java | 11 +++++++++++ .../exec/ControllerMemoryParametersTest.java | 18 ++++++++++++------ .../msq/util/MultiStageQueryContextTest.java | 18 ++++++++++++++++++ 9 files changed, 59 insertions(+), 12 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 1b6aa7f7dd6d..672f8050a1ee 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -417,6 +417,7 @@ The following table lists the context parameters for the MSQ task engine: | `failOnEmptyInsert` | INSERT or REPLACE

When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` | | `storeCompactionState` | REPLACE

When set to true, a REPLACE query stores as part of each segment's metadata a `lastCompactionState` field that captures the various specs used to create the segment. Future compaction jobs skip segments whose `lastCompactionState` matches the desired compaction state. Works the same as [`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context flag. | `false` | | `removeNullBytes` | SELECT, INSERT or REPLACE

The MSQ engine cannot process null bytes in strings and throws `InvalidNullByteFault` if it encounters them in the source data. If the parameter is set to true, The MSQ engine will remove the null bytes in string fields when reading the data. | `false` | +| `maxFrameSize` | SELECT, INSERT or REPLACE

Size of frames used for data transfer within the MSQ engine. You generally do not need to change this unless you have very large rows. | `1000000` (1 MB) | ## Joins diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java index cdd317c5466a..6a1b65c09f51 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java @@ -140,7 +140,8 @@ public ControllerQueryKernelConfig queryKernelConfig(final MSQSpec querySpec) final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( memoryIntrospector, - workerIds.size() + workerIds.size(), + MultiStageQueryContext.getFrameSize(context) ); final int maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStagesWithDefault( diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java index c5131ddd84ec..f85b1c8d7c72 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java @@ -66,14 +66,15 @@ public ControllerMemoryParameters(int partitionStatisticsMaxRetainedBytes) */ public static ControllerMemoryParameters createProductionInstance( final MemoryIntrospector memoryIntrospector, - final int maxWorkerCount + final int maxWorkerCount, + final int frameSize ) { final long totalMemory = memoryIntrospector.memoryPerTask(); final long memoryForInputChannels = WorkerMemoryParameters.computeProcessorMemoryForInputChannels( maxWorkerCount, - WorkerMemoryParameters.DEFAULT_FRAME_SIZE + frameSize ); final int partitionStatisticsMaxRetainedBytes = (int) Math.min( totalMemory - memoryForInputChannels, diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java index 462123f21b33..aa0a2f0d686d 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java @@ -41,6 +41,7 @@ import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor; import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl; +import org.apache.druid.msq.util.MultiStageQueryContext; import org.apache.druid.segment.incremental.IncrementalIndex; import javax.annotation.Nullable; @@ -160,7 +161,7 @@ public static WorkerMemoryParameters createProductionInstance( final StageDefinition stageDef = workOrder.getStageDefinition(); return createInstance( memoryIntrospector, - DEFAULT_FRAME_SIZE, + MultiStageQueryContext.getFrameSize(workOrder.getWorkerContext()), workOrder.getInputs(), stageDef.getBroadcastInputNumbers(), stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null, diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java index 67fc6aadb247..317b7d4602dc 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java @@ -125,7 +125,8 @@ public ControllerQueryKernelConfig queryKernelConfig(final MSQSpec querySpec) final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( memoryIntrospector, - querySpec.getTuningConfig().getMaxNumWorkers() + querySpec.getTuningConfig().getMaxNumWorkers(), + MultiStageQueryContext.getFrameSize(querySpec.getContext()) ); final ControllerQueryKernelConfig config = makeQueryKernelConfig(querySpec, memoryParameters); diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java index 865cb6b0c1d1..790e83ceff89 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.msq.util.MultiStageQueryContext; import java.util.Objects; @@ -35,7 +36,13 @@ public class RowTooLargeFault extends BaseMSQFault @JsonCreator public RowTooLargeFault(@JsonProperty("maxFrameSize") final long maxFrameSize) { - super(CODE, "Encountered row that cannot fit in a single frame (max frame size = %,d)", maxFrameSize); + super( + CODE, + "Encountered row that cannot fit in a single frame (max frame size = %,d). " + + "Reduce your row size or increase the context parameter[%s].", + maxFrameSize, + MultiStageQueryContext.CTX_MAX_FRAME_SIZE + ); this.maxFrameSize = maxFrameSize; } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 9cef48994967..1cd222c4f6c0 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -37,6 +37,7 @@ import org.apache.druid.msq.exec.ClusterStatisticsMergeMode; import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.SegmentSource; +import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.indexing.destination.MSQSelectDestination; import org.apache.druid.msq.indexing.error.MSQWarnings; import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; @@ -219,6 +220,11 @@ public class MultiStageQueryContext */ public static final String CTX_TARGET_PARTITIONS_PER_WORKER = "targetPartitionsPerWorker"; + /** + * Maximum size of frames to create. Defaults to {@link WorkerMemoryParameters#DEFAULT_FRAME_SIZE}. + */ + public static final String CTX_MAX_FRAME_SIZE = "maxFrameSize"; + private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL); public static String getMSQMode(final QueryContext queryContext) @@ -497,6 +503,11 @@ public static Set getColumnsExcludedFromTypeVerification(final QueryCont return new HashSet<>(decodeList(CTX_SKIP_TYPE_VERIFICATION, queryContext.getString(CTX_SKIP_TYPE_VERIFICATION))); } + public static int getFrameSize(final QueryContext queryContext) + { + return queryContext.getInt(CTX_MAX_FRAME_SIZE, WorkerMemoryParameters.DEFAULT_FRAME_SIZE); + } + /** * Decodes a list from either a JSON or CSV string. */ diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java index d6ae0d7e190a..3f5ba3034146 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java @@ -36,7 +36,8 @@ public void test_oneQueryInJvm() { final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( makeMemoryIntrospector(128_000_000, 1), - 1 + 1, + WorkerMemoryParameters.DEFAULT_FRAME_SIZE ); Assert.assertEquals(101_400_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes()); @@ -47,7 +48,8 @@ public void test_oneQueryInJvm_oneHundredWorkers() { final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( makeMemoryIntrospector(256_000_000, 1), - 100 + 100, + WorkerMemoryParameters.DEFAULT_FRAME_SIZE ); Assert.assertEquals(104_800_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes()); @@ -58,7 +60,8 @@ public void test_twoQueriesInJvm() { final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( makeMemoryIntrospector(128_000_000, 2), - 1 + 1, + WorkerMemoryParameters.DEFAULT_FRAME_SIZE ); Assert.assertEquals(50_200_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes()); @@ -69,7 +72,8 @@ public void test_maxSized() { final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( makeMemoryIntrospector(1_000_000_000, 1), - 1 + 1, + WorkerMemoryParameters.DEFAULT_FRAME_SIZE ); Assert.assertEquals(300_000_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes()); @@ -82,7 +86,8 @@ public void test_notEnoughMemory() MSQException.class, () -> ControllerMemoryParameters.createProductionInstance( makeMemoryIntrospector(30_000_000, 1), - 1 + 1, + WorkerMemoryParameters.DEFAULT_FRAME_SIZE ) ); @@ -98,7 +103,8 @@ public void test_minimalMemory() { final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance( makeMemoryIntrospector(33_750_000, 1), - 1 + 1, + WorkerMemoryParameters.DEFAULT_FRAME_SIZE ); Assert.assertEquals(26_000_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes()); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java index bd3cf2b8e1b2..3dc8591641f3 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.indexing.destination.MSQSelectDestination; import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; import org.apache.druid.query.BadQueryContextException; @@ -46,6 +47,7 @@ import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERANCE; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS; +import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_FRAME_SIZE; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_NUM_TASKS; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MSQ_MODE; import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_REMOVE_NULL_BYTES; @@ -343,6 +345,22 @@ public void testDartSelectDestination() ); } + @Test + public void getFrameSize_unset_returnsDefaultValue() + { + Assert.assertEquals( + WorkerMemoryParameters.DEFAULT_FRAME_SIZE, + MultiStageQueryContext.getFrameSize(QueryContext.empty()) + ); + } + + @Test + public void getFrameSize_set_returnsCorrectValue() + { + Map propertyMap = ImmutableMap.of(CTX_MAX_FRAME_SIZE, 500000); + Assert.assertEquals(500000, MultiStageQueryContext.getFrameSize(QueryContext.of(propertyMap))); + } + private static List decodeSortOrder(@Nullable final String input) { return MultiStageQueryContext.decodeList(MultiStageQueryContext.CTX_SORT_ORDER, input);