diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index 1b6aa7f7dd6d..672f8050a1ee 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -417,6 +417,7 @@ The following table lists the context parameters for the MSQ task engine:
| `failOnEmptyInsert` | INSERT or REPLACE
When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` |
| `storeCompactionState` | REPLACE
When set to true, a REPLACE query stores as part of each segment's metadata a `lastCompactionState` field that captures the various specs used to create the segment. Future compaction jobs skip segments whose `lastCompactionState` matches the desired compaction state. Works the same as [`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context flag. | `false` |
| `removeNullBytes` | SELECT, INSERT or REPLACE
The MSQ engine cannot process null bytes in strings and throws `InvalidNullByteFault` if it encounters them in the source data. If the parameter is set to true, The MSQ engine will remove the null bytes in string fields when reading the data. | `false` |
+| `maxFrameSize` | SELECT, INSERT or REPLACE
Size of frames used for data transfer within the MSQ engine. You generally do not need to change this unless you have very large rows. | `1000000` (1 MB) |
## Joins
diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
index cdd317c5466a..6a1b65c09f51 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartControllerContext.java
@@ -140,7 +140,8 @@ public ControllerQueryKernelConfig queryKernelConfig(final MSQSpec querySpec)
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
memoryIntrospector,
- workerIds.size()
+ workerIds.size(),
+ MultiStageQueryContext.getFrameSize(context)
);
final int maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
index c5131ddd84ec..f85b1c8d7c72 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerMemoryParameters.java
@@ -66,14 +66,15 @@ public ControllerMemoryParameters(int partitionStatisticsMaxRetainedBytes)
*/
public static ControllerMemoryParameters createProductionInstance(
final MemoryIntrospector memoryIntrospector,
- final int maxWorkerCount
+ final int maxWorkerCount,
+ final int frameSize
)
{
final long totalMemory = memoryIntrospector.memoryPerTask();
final long memoryForInputChannels =
WorkerMemoryParameters.computeProcessorMemoryForInputChannels(
maxWorkerCount,
- WorkerMemoryParameters.DEFAULT_FRAME_SIZE
+ frameSize
);
final int partitionStatisticsMaxRetainedBytes = (int) Math.min(
totalMemory - memoryForInputChannels,
diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
index 462123f21b33..aa0a2f0d686d 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerMemoryParameters.java
@@ -41,6 +41,7 @@
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
+import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.incremental.IncrementalIndex;
import javax.annotation.Nullable;
@@ -160,7 +161,7 @@ public static WorkerMemoryParameters createProductionInstance(
final StageDefinition stageDef = workOrder.getStageDefinition();
return createInstance(
memoryIntrospector,
- DEFAULT_FRAME_SIZE,
+ MultiStageQueryContext.getFrameSize(workOrder.getWorkerContext()),
workOrder.getInputs(),
stageDef.getBroadcastInputNumbers(),
stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
index 67fc6aadb247..317b7d4602dc 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/IndexerControllerContext.java
@@ -125,7 +125,8 @@ public ControllerQueryKernelConfig queryKernelConfig(final MSQSpec querySpec)
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
memoryIntrospector,
- querySpec.getTuningConfig().getMaxNumWorkers()
+ querySpec.getTuningConfig().getMaxNumWorkers(),
+ MultiStageQueryContext.getFrameSize(querySpec.getContext())
);
final ControllerQueryKernelConfig config = makeQueryKernelConfig(querySpec, memoryParameters);
diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
index 865cb6b0c1d1..790e83ceff89 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/RowTooLargeFault.java
@@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.druid.msq.util.MultiStageQueryContext;
import java.util.Objects;
@@ -35,7 +36,13 @@ public class RowTooLargeFault extends BaseMSQFault
@JsonCreator
public RowTooLargeFault(@JsonProperty("maxFrameSize") final long maxFrameSize)
{
- super(CODE, "Encountered row that cannot fit in a single frame (max frame size = %,d)", maxFrameSize);
+ super(
+ CODE,
+ "Encountered row that cannot fit in a single frame (max frame size = %,d). "
+ + "Reduce your row size or increase the context parameter[%s].",
+ maxFrameSize,
+ MultiStageQueryContext.CTX_MAX_FRAME_SIZE
+ );
this.maxFrameSize = maxFrameSize;
}
diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
index 9cef48994967..1cd222c4f6c0 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
@@ -37,6 +37,7 @@
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.SegmentSource;
+import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
@@ -219,6 +220,11 @@ public class MultiStageQueryContext
*/
public static final String CTX_TARGET_PARTITIONS_PER_WORKER = "targetPartitionsPerWorker";
+ /**
+ * Maximum size of frames to create. Defaults to {@link WorkerMemoryParameters#DEFAULT_FRAME_SIZE}.
+ */
+ public static final String CTX_MAX_FRAME_SIZE = "maxFrameSize";
+
private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);
public static String getMSQMode(final QueryContext queryContext)
@@ -497,6 +503,11 @@ public static Set getColumnsExcludedFromTypeVerification(final QueryCont
return new HashSet<>(decodeList(CTX_SKIP_TYPE_VERIFICATION, queryContext.getString(CTX_SKIP_TYPE_VERIFICATION)));
}
+ public static int getFrameSize(final QueryContext queryContext)
+ {
+ return queryContext.getInt(CTX_MAX_FRAME_SIZE, WorkerMemoryParameters.DEFAULT_FRAME_SIZE);
+ }
+
/**
* Decodes a list from either a JSON or CSV string.
*/
diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
index d6ae0d7e190a..3f5ba3034146 100644
--- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
+++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/ControllerMemoryParametersTest.java
@@ -36,7 +36,8 @@ public void test_oneQueryInJvm()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(128_000_000, 1),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(101_400_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -47,7 +48,8 @@ public void test_oneQueryInJvm_oneHundredWorkers()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(256_000_000, 1),
- 100
+ 100,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(104_800_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -58,7 +60,8 @@ public void test_twoQueriesInJvm()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(128_000_000, 2),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(50_200_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -69,7 +72,8 @@ public void test_maxSized()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(1_000_000_000, 1),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(300_000_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
@@ -82,7 +86,8 @@ public void test_notEnoughMemory()
MSQException.class,
() -> ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(30_000_000, 1),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
)
);
@@ -98,7 +103,8 @@ public void test_minimalMemory()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(33_750_000, 1),
- 1
+ 1,
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);
Assert.assertEquals(26_000_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
index bd3cf2b8e1b2..3dc8591641f3 100644
--- a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
+++ b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java
@@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.query.BadQueryContextException;
@@ -46,6 +47,7 @@
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERANCE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS;
+import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_FRAME_SIZE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_NUM_TASKS;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MSQ_MODE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_REMOVE_NULL_BYTES;
@@ -343,6 +345,22 @@ public void testDartSelectDestination()
);
}
+ @Test
+ public void getFrameSize_unset_returnsDefaultValue()
+ {
+ Assert.assertEquals(
+ WorkerMemoryParameters.DEFAULT_FRAME_SIZE,
+ MultiStageQueryContext.getFrameSize(QueryContext.empty())
+ );
+ }
+
+ @Test
+ public void getFrameSize_set_returnsCorrectValue()
+ {
+ Map propertyMap = ImmutableMap.of(CTX_MAX_FRAME_SIZE, 500000);
+ Assert.assertEquals(500000, MultiStageQueryContext.getFrameSize(QueryContext.of(propertyMap)));
+ }
+
private static List decodeSortOrder(@Nullable final String input)
{
return MultiStageQueryContext.decodeList(MultiStageQueryContext.CTX_SORT_ORDER, input);