Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ The following table lists the context parameters for the MSQ task engine:
| `failOnEmptyInsert` | INSERT or REPLACE<br /><br /> When set to false (the default), an INSERT query generating no output rows will be no-op, and a REPLACE query generating no output rows will delete all data that matches the OVERWRITE clause. When set to true, an ingest query generating no output rows will throw an `InsertCannotBeEmpty` fault. | `false` |
| `storeCompactionState` | REPLACE<br /><br /> When set to true, a REPLACE query stores as part of each segment's metadata a `lastCompactionState` field that captures the various specs used to create the segment. Future compaction jobs skip segments whose `lastCompactionState` matches the desired compaction state. Works the same as [`storeCompactionState`](../ingestion/tasks.md#context-parameters) task context flag. | `false` |
| `removeNullBytes` | SELECT, INSERT or REPLACE<br /><br /> The MSQ engine cannot process null bytes in strings and throws `InvalidNullByteFault` if it encounters them in the source data. If the parameter is set to true, The MSQ engine will remove the null bytes in string fields when reading the data. | `false` |
| `maxFrameSize` | SELECT, INSERT or REPLACE<br /><br />Size of frames used for data transfer within the MSQ engine. You generally do not need to change this unless you have very large rows. | `1000000` (1 MB) |

## Joins

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public ControllerQueryKernelConfig queryKernelConfig(final MSQSpec querySpec)
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
memoryIntrospector,
workerIds.size()
workerIds.size(),
MultiStageQueryContext.getFrameSize(context)
);

final int maxConcurrentStages = MultiStageQueryContext.getMaxConcurrentStagesWithDefault(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ public ControllerMemoryParameters(int partitionStatisticsMaxRetainedBytes)
*/
public static ControllerMemoryParameters createProductionInstance(
final MemoryIntrospector memoryIntrospector,
final int maxWorkerCount
final int maxWorkerCount,
final int frameSize
)
{
final long totalMemory = memoryIntrospector.memoryPerTask();
final long memoryForInputChannels =
WorkerMemoryParameters.computeProcessorMemoryForInputChannels(
maxWorkerCount,
WorkerMemoryParameters.DEFAULT_FRAME_SIZE
frameSize
);
final int partitionStatisticsMaxRetainedBytes = (int) Math.min(
totalMemory - memoryForInputChannels,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.querykit.BroadcastJoinSegmentMapFnProcessor;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.incremental.IncrementalIndex;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -160,7 +161,7 @@ public static WorkerMemoryParameters createProductionInstance(
final StageDefinition stageDef = workOrder.getStageDefinition();
return createInstance(
memoryIntrospector,
DEFAULT_FRAME_SIZE,
MultiStageQueryContext.getFrameSize(workOrder.getWorkerContext()),
workOrder.getInputs(),
stageDef.getBroadcastInputNumbers(),
stageDef.doesShuffle() ? stageDef.getShuffleSpec() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ public ControllerQueryKernelConfig queryKernelConfig(final MSQSpec querySpec)
final ControllerMemoryParameters memoryParameters =
ControllerMemoryParameters.createProductionInstance(
memoryIntrospector,
querySpec.getTuningConfig().getMaxNumWorkers()
querySpec.getTuningConfig().getMaxNumWorkers(),
MultiStageQueryContext.getFrameSize(querySpec.getContext())
);

final ControllerQueryKernelConfig config = makeQueryKernelConfig(querySpec, memoryParameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.msq.util.MultiStageQueryContext;

import java.util.Objects;

Expand All @@ -35,7 +36,13 @@ public class RowTooLargeFault extends BaseMSQFault
@JsonCreator
public RowTooLargeFault(@JsonProperty("maxFrameSize") final long maxFrameSize)
{
super(CODE, "Encountered row that cannot fit in a single frame (max frame size = %,d)", maxFrameSize);
super(
CODE,
"Encountered row that cannot fit in a single frame (max frame size = %,d). "
+ "Reduce your row size or increase the context parameter[%s].",
maxFrameSize,
MultiStageQueryContext.CTX_MAX_FRAME_SIZE
);
this.maxFrameSize = maxFrameSize;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.msq.exec.ClusterStatisticsMergeMode;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.exec.SegmentSource;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
Expand Down Expand Up @@ -219,6 +220,11 @@ public class MultiStageQueryContext
*/
public static final String CTX_TARGET_PARTITIONS_PER_WORKER = "targetPartitionsPerWorker";

/**
* Maximum size of frames to create. Defaults to {@link WorkerMemoryParameters#DEFAULT_FRAME_SIZE}.
*/
public static final String CTX_MAX_FRAME_SIZE = "maxFrameSize";

private static final Pattern LOOKS_LIKE_JSON_ARRAY = Pattern.compile("^\\s*\\[.*", Pattern.DOTALL);

public static String getMSQMode(final QueryContext queryContext)
Expand Down Expand Up @@ -497,6 +503,11 @@ public static Set<String> getColumnsExcludedFromTypeVerification(final QueryCont
return new HashSet<>(decodeList(CTX_SKIP_TYPE_VERIFICATION, queryContext.getString(CTX_SKIP_TYPE_VERIFICATION)));
}

public static int getFrameSize(final QueryContext queryContext)
{
return queryContext.getInt(CTX_MAX_FRAME_SIZE, WorkerMemoryParameters.DEFAULT_FRAME_SIZE);
}

/**
* Decodes a list from either a JSON or CSV string.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public void test_oneQueryInJvm()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(128_000_000, 1),
1
1,
WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);

Assert.assertEquals(101_400_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
Expand All @@ -47,7 +48,8 @@ public void test_oneQueryInJvm_oneHundredWorkers()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(256_000_000, 1),
100
100,
WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);

Assert.assertEquals(104_800_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
Expand All @@ -58,7 +60,8 @@ public void test_twoQueriesInJvm()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(128_000_000, 2),
1
1,
WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);

Assert.assertEquals(50_200_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
Expand All @@ -69,7 +72,8 @@ public void test_maxSized()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(1_000_000_000, 1),
1
1,
WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);

Assert.assertEquals(300_000_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
Expand All @@ -82,7 +86,8 @@ public void test_notEnoughMemory()
MSQException.class,
() -> ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(30_000_000, 1),
1
1,
WorkerMemoryParameters.DEFAULT_FRAME_SIZE
)
);

Expand All @@ -98,7 +103,8 @@ public void test_minimalMemory()
{
final ControllerMemoryParameters memoryParameters = ControllerMemoryParameters.createProductionInstance(
makeMemoryIntrospector(33_750_000, 1),
1
1,
WorkerMemoryParameters.DEFAULT_FRAME_SIZE
);

Assert.assertEquals(26_000_000, memoryParameters.getPartitionStatisticsMaxRetainedBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.msq.exec.WorkerMemoryParameters;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.kernel.WorkerAssignmentStrategy;
import org.apache.druid.query.BadQueryContextException;
Expand All @@ -46,6 +47,7 @@
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_DURABLE_SHUFFLE_STORAGE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FAULT_TOLERANCE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_FRAME_SIZE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_NUM_TASKS;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MSQ_MODE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_REMOVE_NULL_BYTES;
Expand Down Expand Up @@ -343,6 +345,22 @@ public void testDartSelectDestination()
);
}

@Test
public void getFrameSize_unset_returnsDefaultValue()
{
Assert.assertEquals(
WorkerMemoryParameters.DEFAULT_FRAME_SIZE,
MultiStageQueryContext.getFrameSize(QueryContext.empty())
);
}

@Test
public void getFrameSize_set_returnsCorrectValue()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_MAX_FRAME_SIZE, 500000);
Assert.assertEquals(500000, MultiStageQueryContext.getFrameSize(QueryContext.of(propertyMap)));
}

private static List<String> decodeSortOrder(@Nullable final String input)
{
return MultiStageQueryContext.decodeList(MultiStageQueryContext.CTX_SORT_ORDER, input);
Expand Down
Loading