diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 6bbdb06cc86b..ea473a95d397 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -413,6 +413,8 @@ The following table lists the context parameters for the MSQ task engine: | `removeNullBytes` | SELECT, INSERT or REPLACE

The MSQ engine cannot process null bytes in strings and throws `InvalidNullByteFault` if it encounters them in the source data. If the parameter is set to true, The MSQ engine will remove the null bytes in string fields when reading the data. | `false` | | `includeAllCounters` | SELECT, INSERT or REPLACE

Whether to include counters that were added in Druid 31 or later. This is a backwards compatibility option that must be set to `false` during a rolling update from versions prior to Druid 31. | `true` | | `maxFrameSize` | SELECT, INSERT or REPLACE

Size of frames used for data transfer within the MSQ engine. You generally do not need to change this unless you have very large rows. | `1000000` (1 MB) | +| `maxInputFilesPerWorker` | SELECT, INSERT, REPLACE

Maximum number of input files or segments per worker. If a single worker would need to read more than this number of files, the query fails with a `TooManyInputFiles` error. In this case, you should either increase this limit if your tasks have enough memory to handle more files, add more workers by increasing `maxNumTasks`, or split your query into smaller queries that process fewer files. | 10,000 | +| `maxPartitions` | SELECT, INSERT, REPLACE

Maximum number of output partitions for any single stage. For INSERT or REPLACE queries, this controls the maximum number of segments that can be generated. If the query would exceed this limit, it fails with a `TooManyPartitions` error. You can increase this limit if needed, break your query into smaller queries, or use a larger target segment size (via `rowsPerSegment`). | 25,000 | | `maxThreads` | SELECT, INSERT or REPLACE

Maximum number of threads to use for processing. This only has an effect if it is greater than zero and less than the default thread count based on system configuration. Otherwise, it is ignored, and workers use the default thread count. | Not set (use default thread count) | ## Joins @@ -570,10 +572,10 @@ The following table lists query limits: | Limit | Value | Error if exceeded | |---|---|---| -| Size of an individual row written to a frame. Row size when written to a frame may differ from the original row size. | 1 MB | `RowTooLarge` | +| Size of an individual row written to a frame. Row size when written to a frame may differ from the original row size. Configurable with [`maxFrameSize`](#context). | 1 MB | `RowTooLarge` | | Number of segment-granular time chunks encountered during ingestion. | 5,000 | `TooManyBuckets`| -| Number of input files/segments per worker. | 10,000 | `TooManyInputFiles`| -| Number of output partitions for any one stage. Number of segments generated during ingestion. |25,000 | `TooManyPartitions`| +| Number of input files/segments per worker. Configurable with [`maxInputFilesPerWorker`](#context). | 10,000 | `TooManyInputFiles`| +| Number of output partitions for any one stage. Number of segments generated during ingestion. Configurable with [`maxPartitions`](#context). | 25,000 | `TooManyPartitions`| | Number of output columns for any one stage. | 2,000 | `TooManyColumns`| | Number of cluster by columns that can appear in a stage | 1,500 | `TooManyClusteredByColumns` | | Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | `TooManyWorkers`| diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b8238b81b165..0330c1f5b39b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -1444,6 +1444,7 @@ private void startWorkForStage( final QueryDefinition queryDef, final ControllerQueryKernel queryKernel, final int stageNumber, + final int maxInputFilesPerWorker, @Nullable final List segmentsToGenerate ) { @@ -1454,7 +1455,8 @@ private void startWorkForStage( segmentsToGenerate ); - final Int2ObjectMap workOrders = queryKernel.createWorkOrders(stageNumber, extraInfos); + final Int2ObjectMap workOrders = + queryKernel.createWorkOrders(stageNumber, maxInputFilesPerWorker, extraInfos); final StageId stageId = new StageId(queryDef.getQueryId(), stageNumber); queryKernel.startStage(stageId); @@ -2533,8 +2535,12 @@ private void submitSequentialMergeFetchRequests(StageId stageId, Set tas */ private void startStages() throws IOException, InterruptedException { + final int maxInputFilesPerWorker = + MultiStageQueryContext.getMaxInputFilesPerWorker(querySpec.getContext()); final long maxInputBytesPerWorker = MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getContext()); + final int maxPartitions = + MultiStageQueryContext.getMaxPartitions(querySpec.getContext()); logKernelStatus(queryDef.getQueryId(), queryKernel); @@ -2545,7 +2551,9 @@ private void startStages() throws IOException, InterruptedException inputSpecSlicerFactory, querySpec.getAssignmentStrategy(), rowBasedFrameType, - maxInputBytesPerWorker + maxInputFilesPerWorker, + maxInputBytesPerWorker, + maxPartitions ); for (final StageId stageId : newStageIds) { @@ -2573,7 +2581,13 @@ private void startStages() throws IOException, InterruptedException retryWorkersOrFailJob(queryKernel, workerFaultSet); } stageRuntimesForLiveReports.put(stageId.getStageNumber(), new Interval(DateTimes.nowUtc(), DateTimes.MAX)); - startWorkForStage(queryDef, queryKernel, stageId.getStageNumber(), segmentsToGenerate); + startWorkForStage( + queryDef, + queryKernel, + stageId.getStageNumber(), + maxInputFilesPerWorker, + segmentsToGenerate + ); } } while (!newStageIds.isEmpty()); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java index fd2107762777..0ca4a0709bbe 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java @@ -19,6 +19,11 @@ package org.apache.druid.msq.exec; +import org.apache.druid.msq.indexing.error.TooManyBucketsFault; +import org.apache.druid.msq.indexing.error.TooManyInputFilesFault; +import org.apache.druid.msq.indexing.error.TooManyPartitionsFault; +import org.apache.druid.msq.util.MultiStageQueryContext; + public class Limits { /** @@ -45,9 +50,10 @@ public class Limits public static final int MAX_WORKERS = 1000; /** - * Maximum number of input files per worker + * Default maximum number of input files per worker. Exceeding this will yield a {@link TooManyInputFilesFault}. + * Can be overridden by the context parameter {@link MultiStageQueryContext#CTX_MAX_INPUT_FILES_PER_WORKER}. */ - public static final int MAX_INPUT_FILES_PER_WORKER = 10_000; + public static final int DEFAULT_MAX_INPUT_FILES_PER_WORKER = 10_000; /** * Maximum number of parse exceptions with their stack traces a worker can send to the controller. @@ -94,10 +100,18 @@ public class Limits public static final long MAX_SELECT_RESULT_ROWS = 3_000; /** - * Max number of partition buckets for ingestion queries. + * Max number of partition buckets. Exceeding this will yield a {@link TooManyBucketsFault}. For an ingestion job, + * this is the maximum number of output time chunks. */ public static final int MAX_PARTITION_BUCKETS = 5_000; + /** + * Default max number of output partitions for a stage. Exceeding this will yield a {@link TooManyPartitionsFault}. + * For an ingestion job, this is the maximum number of segments that can be created. + * Can be overridden by the context parameter {@link MultiStageQueryContext#CTX_MAX_PARTITIONS}. + */ + public static final int DEFAULT_MAX_PARTITIONS = 25_000; + /** * Max number of rows with the same key in a window. This acts as a guardrail for * data distribution with high cardinality diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index 8931b26b915d..16276f24202b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -65,7 +65,6 @@ import org.apache.druid.msq.input.stage.StageInputSliceReader; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.input.table.SegmentsInputSliceReader; -import org.apache.druid.msq.kernel.ShuffleKind; import org.apache.druid.msq.kernel.WorkOrder; import org.apache.druid.msq.shuffle.output.DurableStorageOutputChannelFactory; import org.apache.druid.msq.util.MultiStageQueryContext; @@ -174,7 +173,6 @@ public void startAsync() try { exec.registerCancellationId(cancellationId); - initGlobalSortPartitionBoundariesIfNeeded(); startStageProcessor(); setUpCompletionCallbacks(); } @@ -284,25 +282,6 @@ private void startStageProcessor() stageResultFuture = processor.execute(executionContext); } - /** - * Initialize {@link #stagePartitionBoundariesFuture} if it will be needed (i.e. if {@link ShuffleKind#GLOBAL_SORT}) - * but does not need statistics. In this case, it is known upfront, before the job starts. - */ - private void initGlobalSortPartitionBoundariesIfNeeded() - { - if (workOrder.getStageDefinition().doesShuffle() - && workOrder.getStageDefinition().getShuffleSpec().kind() == ShuffleKind.GLOBAL_SORT - && !workOrder.getStageDefinition().mustGatherResultKeyStatistics()) { - // Result key stats aren't needed, so the partition boundaries are knowable ahead of time. Compute them now. - final ClusterByPartitions boundaries = - workOrder.getStageDefinition() - .generatePartitionBoundariesForShuffle(null) - .valueOrThrow(); - - stagePartitionBoundariesFuture.set(boundaries); - } - } - /** * Callbacks that fire when all work for the stage is done (i.e. when {@link #stageResultFuture} resolves). */ diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index ffd64dbe0885..3f28593bc217 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -429,6 +429,11 @@ private void handleNewWorkOrder( // Set up processorCloser (called when processing is done). kernelHolder.processorCloser.register(() -> runWorkOrder.stop(null)); + // Set resultPartitionBoundaries in RunWorkOrder if it is known ahead of time. + if (kernel.hasResultPartitionBoundaries()) { + runWorkOrder.getStagePartitionBoundariesFuture().set(kernel.getResultPartitionBoundaries()); + } + // Start working on this stage immediately. kernel.startReading(); runWorkOrder.startAsync(); diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyInputFilesFault.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyInputFilesFault.java index ceb1367ffd83..cae7a5c36d74 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyInputFilesFault.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyInputFilesFault.java @@ -46,10 +46,11 @@ public TooManyInputFilesFault( super( CODE, "Too many input files/segments [%d] encountered. Maximum input files/segments per worker is set to [%d]. Try" - + " breaking your query up into smaller queries, or increasing the number of workers to at least [%d] by" - + " setting %s in your query context", + + " increasing the limit using the %s query context parameter, breaking your query up into smaller queries," + + " or increasing the number of workers to at least [%d] by setting %s in your query context.", numInputFiles, maxInputFiles, + MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER, minNumWorkers, MultiStageQueryContext.CTX_MAX_NUM_TASKS ); diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyPartitionsFault.java b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyPartitionsFault.java index e9d91a42d4ef..e59b6645f750 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyPartitionsFault.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyPartitionsFault.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.druid.msq.util.MultiStageQueryContext; import java.util.Objects; @@ -37,9 +38,10 @@ public TooManyPartitionsFault(@JsonProperty("maxPartitions") final int maxPartit { super( CODE, - "Too many partitions (max = %d); try breaking your query up into smaller queries or " - + "using a larger target size", - maxPartitions + "Too many partitions (max = %d). Try increasing the limit using the %s query context parameter, " + + "breaking your query up into smaller queries, or using a larger target size.", + maxPartitions, + MultiStageQueryContext.CTX_MAX_PARTITIONS ); this.maxPartitions = maxPartitions; } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java index 4555f13eb751..736195155c69 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/StageDefinition.java @@ -48,6 +48,8 @@ import org.apache.druid.msq.input.table.TableInputSpec; import org.apache.druid.msq.statistics.ClusterByStatisticsCollector; import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl; +import org.apache.druid.msq.util.MultiStageQueryContext; +import org.apache.druid.query.QueryContext; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; @@ -85,8 +87,6 @@ */ public class StageDefinition { - private static final int MAX_PARTITIONS = 25_000; // Limit for TooManyPartitions - // If adding any fields here, add them to builder(StageDefinition) below too. private final StageId id; private final List inputSpecs; @@ -292,12 +292,6 @@ public boolean getShuffleCheckHasMultipleValues() return shuffleCheckHasMultipleValues; } - public int getMaxPartitionCount() - { - // Pretends to be an instance method, but really returns a constant. Maybe one day this will be defined per stage. - return MAX_PARTITIONS; - } - public int getStageNumber() { return id.getStageNumber(); @@ -334,8 +328,19 @@ public boolean mustGatherResultKeyStatistics() return mustGatherResultKeyStatistics(shuffleSpec); } + /** + * Generate partition boundaries for {@link ShuffleKind#GLOBAL_SORT} shuffles. + * + * @param collector statistics collector, to be provided if {@link #mustGatherResultKeyStatistics()} + * @param maxPartitions maximum number of partitions to generate. On the controller, this is the value of + * {@link MultiStageQueryContext#getMaxPartitions(QueryContext)}. On workers, this method + * is only used when the number of partitions is determined ahead of time by the + * {@link ShuffleSpec}, so {@link Integer#MAX_VALUE} is typically provided for this parameter + * out of convenience. + */ public Either generatePartitionBoundariesForShuffle( - @Nullable ClusterByStatisticsCollector collector + @Nullable ClusterByStatisticsCollector collector, + int maxPartitions ) { if (shuffleSpec == null) { @@ -351,7 +356,7 @@ public Either generatePartitionBoundariesForShuffle( } else if (!mustGatherResultKeyStatistics() && collector != null) { throw new ISE("Statistics gathered, but not required for stage[%d]", getStageNumber()); } else { - return ((GlobalSortShuffleSpec) shuffleSpec).generatePartitionsForGlobalSort(collector, MAX_PARTITIONS); + return ((GlobalSortShuffleSpec) shuffleSpec).generatePartitionsForGlobalSort(collector, maxPartitions); } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java index cdf4e2e20b0b..0c9115ec4379 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/WorkerAssignmentStrategy.java @@ -24,7 +24,6 @@ import it.unimi.dsi.fastutil.ints.Int2IntMap; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.InputSlice; import org.apache.druid.msq.input.InputSpec; import org.apache.druid.msq.input.InputSpecSlicer; @@ -49,6 +48,7 @@ public List assign( final InputSpec inputSpec, final Int2IntMap stageWorkerCountMap, final InputSpecSlicer slicer, + final int maxInputFilesPerSlice, final long maxInputBytesPerSlice ) { @@ -58,7 +58,7 @@ public List assign( /** * Use the lowest possible number of workers, while keeping each worker's workload under - * {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@code maxInputBytesPerWorker} bytes. + * {@code maxInputFilesPerSlice} files and {@code maxInputBytesPerWorker} bytes. * * Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible. */ @@ -69,6 +69,7 @@ public List assign( final InputSpec inputSpec, final Int2IntMap stageWorkerCountMap, final InputSpecSlicer slicer, + final int maxInputFilesPerSlice, final long maxInputBytesPerSlice ) { @@ -76,7 +77,7 @@ public List assign( return slicer.sliceDynamic( inputSpec, stageDef.getMaxWorkerCount(), - Limits.MAX_INPUT_FILES_PER_WORKER, + maxInputFilesPerSlice, maxInputBytesPerSlice ); } else { @@ -117,6 +118,7 @@ public String toString() * @param inputSpec inputSpec containing information on where the input is read from * @param stageWorkerCountMap map of past stage number vs number of worker inputs * @param slicer creates slices of input spec based on other parameters + * @param maxInputFilesPerSlice hard maximum number of files per input slice * @param maxInputBytesPerSlice maximum suggested bytes per input slice * @return list containing input slices */ @@ -125,6 +127,7 @@ public abstract List assign( InputSpec inputSpec, Int2IntMap stageWorkerCountMap, InputSpecSlicer slicer, + int maxInputFilesPerSlice, long maxInputBytesPerSlice ); } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index f61022044b04..7356daef89a8 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -37,7 +37,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.exec.ExtraInfoHolder; -import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.OutputChannelMode; import org.apache.druid.msq.indexing.error.CanceledFault; import org.apache.druid.msq.indexing.error.MSQException; @@ -183,14 +182,18 @@ public List createAndGetNewStageIds( final InputSpecSlicerFactory slicerFactory, final WorkerAssignmentStrategy assignmentStrategy, final FrameType rowBasedFrameType, - final long maxInputBytesPerWorker + final int maxInputFilesPerWorker, + final long maxInputBytesPerWorker, + final int maxPartitions ) { createNewKernels( slicerFactory, assignmentStrategy, rowBasedFrameType, - maxInputBytesPerWorker + maxInputFilesPerWorker, + maxInputBytesPerWorker, + maxPartitions ); return stageTrackers.values() @@ -289,6 +292,7 @@ public boolean isSuccess() */ public Int2ObjectMap createWorkOrders( final int stageNumber, + final int maxInputFilesPerWorker, @Nullable final Int2ObjectMap extraInfos ) { @@ -318,16 +322,14 @@ public Int2ObjectMap createWorkOrders( ); final int numInputFiles = Ints.checkedCast(workOrder.getInputs().stream().mapToLong(InputSlice::fileCount).sum()); - fault = fault || IntMath.divide(numInputFiles, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING) > 1; + fault = fault || IntMath.divide(numInputFiles, maxInputFilesPerWorker, RoundingMode.CEILING) > 1; totalFileCount += numInputFiles; workerToWorkOrder.put(workerNumber, workOrder); } - final int requiredWorkers = IntMath.divide(totalFileCount, Limits.MAX_INPUT_FILES_PER_WORKER, RoundingMode.CEILING); + final int requiredWorkers = IntMath.divide(totalFileCount, maxInputFilesPerWorker, RoundingMode.CEILING); if (fault) { - throw new MSQException( - new TooManyInputFilesFault(totalFileCount, Limits.MAX_INPUT_FILES_PER_WORKER, requiredWorkers) - ); + throw new MSQException(new TooManyInputFilesFault(totalFileCount, maxInputFilesPerWorker, requiredWorkers)); } stageWorkOrders.put(new StageId(queryDef.getQueryId(), stageNumber), workerToWorkOrder); return workerToWorkOrder; @@ -337,7 +339,9 @@ private void createNewKernels( final InputSpecSlicerFactory slicerFactory, final WorkerAssignmentStrategy assignmentStrategy, final FrameType rowBasedFrameType, - final long maxInputBytesPerWorker + final int maxInputFilesPerWorker, + final long maxInputBytesPerWorker, + final int maxPartitions ) { StageGroup stageGroup; @@ -357,7 +361,9 @@ && getNonTerminalActiveStageCount() + stageGroup.size() <= config.getMaxConcurre slicerFactory, assignmentStrategy, rowBasedFrameType, - maxInputBytesPerWorker + maxInputFilesPerWorker, + maxInputBytesPerWorker, + maxPartitions ) ); @@ -380,7 +386,9 @@ private ControllerStageTracker createStageTracker( final InputSpecSlicerFactory slicerFactory, final WorkerAssignmentStrategy assignmentStrategy, final FrameType rowBasedFrameType, - final long maxInputBytesPerWorker + final int maxInputFilesPerWorker, + final long maxInputBytesPerWorker, + final int maxPartitions ) { final Int2IntMap stageWorkerCountMap = new Int2IntAVLTreeMap(); @@ -410,7 +418,9 @@ private ControllerStageTracker createStageTracker( assignmentStrategy, rowBasedFrameType, config.getMaxRetainedPartitionSketchBytes(), - maxInputBytesPerWorker + maxInputFilesPerWorker, + maxInputBytesPerWorker, + maxPartitions ); } @@ -571,7 +581,8 @@ public Object getResultObjectForStage(final StageId stageId) /** * Checks if the stage can be started, delegates call to {@link ControllerStageTracker#start()} for internal phase - * transition and registers the transition in this queryKernel. Work orders need to be created via {@link ControllerQueryKernel#createWorkOrders(int, Int2ObjectMap)} before calling this method. + * transition and registers the transition in this queryKernel. Work orders need to be created via + * {@link ControllerQueryKernel#createWorkOrders(int, int, Int2ObjectMap)} before calling this method. */ public void startStage(final StageId stageId) { diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java index 1c3fc1f84d40..1a2a0dc7b798 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java @@ -88,6 +88,11 @@ class ControllerStageTracker */ private final FrameType rowBasedFrameType; + /** + * Maximum number of partitions for this stage. See {@link MultiStageQueryContext#getMaxPartitions}. + */ + private final int maxPartitions; + // worker-> workerStagePhase // Controller keeps track of the stage with this map. // Currently, we rely on the serial nature of the state machine to keep things in sync between the controller and the worker. @@ -136,7 +141,8 @@ private ControllerStageTracker( final StageDefinition stageDef, final WorkerInputs workerInputs, final FrameType rowBasedFrameType, - final int maxRetainedPartitionSketchBytes + final int maxRetainedPartitionSketchBytes, + final int maxPartitions ) { this.stageDef = stageDef; @@ -144,6 +150,7 @@ private ControllerStageTracker( this.workerInputs = workerInputs; this.rowBasedFrameType = rowBasedFrameType; this.maxRetainedPartitionSketchBytes = maxRetainedPartitionSketchBytes; + this.maxPartitions = maxPartitions; initializeWorkerState(workerInputs.workers()); @@ -178,7 +185,9 @@ static ControllerStageTracker create( final WorkerAssignmentStrategy assignmentStrategy, final FrameType rowBasedFrameType, final int maxRetainedPartitionSketchBytes, - final long maxInputBytesPerWorker + final int maxInputFilesPerWorker, + final long maxInputBytesPerWorker, + final int maxPartitions ) { final WorkerInputs workerInputs = WorkerInputs.create( @@ -186,6 +195,7 @@ static ControllerStageTracker create( stageWorkerCountMap, slicer, assignmentStrategy, + maxInputFilesPerWorker, maxInputBytesPerWorker ); @@ -193,7 +203,8 @@ static ControllerStageTracker create( stageDef, workerInputs, rowBasedFrameType, - maxRetainedPartitionSketchBytes + maxRetainedPartitionSketchBytes, + maxPartitions ); } @@ -589,10 +600,10 @@ void mergeClusterByStatisticsCollectorForTimeChunk( ); ClusterByStatisticsCollector collector = timeChunkToCollector.get(tc); Either countOrPartitions = - stageDef.generatePartitionBoundariesForShuffle(collector); + stageDef.generatePartitionBoundariesForShuffle(collector, maxPartitions); totalPartitionCount += getPartitionCountFromEither(countOrPartitions); - if (totalPartitionCount > stageDef.getMaxPartitionCount()) { - failForReason(new TooManyPartitionsFault(stageDef.getMaxPartitionCount())); + if (totalPartitionCount > maxPartitions) { + failForReason(new TooManyPartitionsFault(maxPartitions)); return null; } timeChunkToBoundaries.put(tc, countOrPartitions.valueOrThrow()); @@ -726,10 +737,11 @@ void mergeClusterByStatisticsCollectorForAllTimeChunks( } if (resultPartitions == null) { final ClusterByStatisticsCollector collector = timeChunkToCollector.get(STATIC_TIME_CHUNK_FOR_PARALLEL_MERGE); - Either countOrPartitions = stageDef.generatePartitionBoundariesForShuffle(collector); + Either countOrPartitions = + stageDef.generatePartitionBoundariesForShuffle(collector, maxPartitions); totalPartitionCount += getPartitionCountFromEither(countOrPartitions); - if (totalPartitionCount > stageDef.getMaxPartitionCount()) { - failForReason(new TooManyPartitionsFault(stageDef.getMaxPartitionCount())); + if (totalPartitionCount > maxPartitions) { + failForReason(new TooManyPartitionsFault(maxPartitions)); return; } resultPartitionBoundaries = countOrPartitions.valueOrThrow(); @@ -954,10 +966,10 @@ private void generateResultPartitionsAndBoundariesWithoutKeyStatistics() } final Either maybeResultPartitionBoundaries = - stageDef.generatePartitionBoundariesForShuffle(null); + stageDef.generatePartitionBoundariesForShuffle(null, maxPartitions); if (maybeResultPartitionBoundaries.isError()) { - failForReason(new TooManyPartitionsFault(stageDef.getMaxPartitionCount())); + failForReason(new TooManyPartitionsFault(maxPartitions)); return; } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java index 8dcaee9c213a..6e1f00b1c84b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/WorkerInputs.java @@ -62,6 +62,7 @@ public static WorkerInputs create( final Int2IntMap stageWorkerCountMap, final InputSpecSlicer slicer, final WorkerAssignmentStrategy assignmentStrategy, + final int maxInputFilesPerWorker, final long maxInputBytesPerWorker ) { @@ -99,6 +100,7 @@ public static WorkerInputs create( inputSpec, stageWorkerCountMap, slicer, + maxInputFilesPerWorker, maxInputBytesPerWorker ); diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java index 726333a2d1dc..ea99dc973f08 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java @@ -74,13 +74,15 @@ private WorkerStageKernel(final WorkOrder workOrder) { this.workOrder = workOrder; - if (workOrder.getStageDefinition().doesShuffle() - && workOrder.getStageDefinition().getShuffleSpec().kind() == ShuffleKind.GLOBAL_SORT - && !workOrder.getStageDefinition().mustGatherResultKeyStatistics()) { - // Use valueOrThrow instead of a nicer error collection mechanism, because we really don't expect the - // MAX_PARTITIONS to be exceeded here. It would involve having a shuffleSpec that was statically configured - // to use a huge number of partitions. - resultPartitionBoundaries = workOrder.getStageDefinition().generatePartitionBoundariesForShuffle(null).valueOrThrow(); + final StageDefinition stageDef = workOrder.getStageDefinition(); + if (stageDef.doesShuffle() + && stageDef.getShuffleSpec().kind() == ShuffleKind.GLOBAL_SORT + && !stageDef.mustGatherResultKeyStatistics()) { + // Result key stats aren't needed, so the partition boundaries are knowable ahead of time. Compute them now. + // Use Integer.MAX_VALUE for maxPartitions since it isn't relevant in this path anyway. + resultPartitionBoundaries = + stageDef.generatePartitionBoundariesForShuffle(null, Integer.MAX_VALUE) + .valueOrThrow(); } } diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 0ce57f39e944..3f8a645a998b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -141,6 +141,8 @@ public class MultiStageQueryContext public static final String CTX_SEGMENT_LOAD_WAIT = "waitUntilSegmentsLoad"; public static final boolean DEFAULT_SEGMENT_LOAD_WAIT = false; public static final String CTX_MAX_INPUT_BYTES_PER_WORKER = "maxInputBytesPerWorker"; + public static final String CTX_MAX_INPUT_FILES_PER_WORKER = "maxInputFilesPerWorker"; + public static final String CTX_MAX_PARTITIONS = "maxPartitions"; public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode"; public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.SEQUENTIAL.toString(); @@ -314,6 +316,34 @@ public static long getMaxInputBytesPerWorker(final QueryContext queryContext) ); } + public static int getMaxInputFilesPerWorker(final QueryContext queryContext) + { + final Integer value = queryContext.getInt(CTX_MAX_INPUT_FILES_PER_WORKER); + if (value == null) { + return Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER; + } + if (value <= 0) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("%s must be a positive integer, got[%d]", CTX_MAX_INPUT_FILES_PER_WORKER, value); + } + return value; + } + + public static int getMaxPartitions(final QueryContext queryContext) + { + final Integer value = queryContext.getInt(CTX_MAX_PARTITIONS); + if (value == null) { + return Limits.DEFAULT_MAX_PARTITIONS; + } + if (value <= 0) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.INVALID_INPUT) + .build("%s must be a positive integer, got[%d]", CTX_MAX_PARTITIONS, value); + } + return value; + } + public static ClusterStatisticsMergeMode getClusterStatisticsMergeMode(QueryContext queryContext) { return QueryContexts.getAsEnum( diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java index 7e85722e4805..12cab8fb7b50 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQFaultsTest.java @@ -495,10 +495,82 @@ public void testTooManyInputFiles() throws IOException .setQueryContext(Map.of("maxNumTasks", 8)) .setExpectedDataSource("foo1") .setExpectedRowSignature(dummyRowSignature) - .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.MAX_INPUT_FILES_PER_WORKER, 10)) + .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, 10)) .verifyResults(); } + @Test + public void testTooManyInputFilesWithLowContextLimit() throws IOException + { + final RowSignature dummyRowSignature = RowSignature.builder().addTimeColumn().build(); + + final int numFiles = 100; + final int maxInputFilesPerWorker = 10; + + final File toRead = getResourceAsTemporaryFile("/wikipedia-sampled.json"); + final String toReadFileNameAsJson = queryFramework().queryJsonMapper().writeValueAsString(toRead.getAbsolutePath()); + final String externalFiles = String.join(", ", Collections.nCopies(numFiles, toReadFileNameAsJson)); + + final Map context = + ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put(MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER, maxInputFilesPerWorker) + .build(); + + testIngestQuery() + .setSql(StringUtils.format( + "insert into foo1 SELECT\n" + + " floor(TIME_PARSE(\"timestamp\") to day) AS __time\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [%s],\"type\":\"local\"}',\n" + + " '{\"type\": \"csv\", \"hasHeaderRow\": true}',\n" + + " '[{\"name\": \"timestamp\", \"type\": \"string\"}]'\n" + + " )\n" + + ") PARTITIONED by day", + externalFiles + )) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(dummyRowSignature) + .setQueryContext(context) + .setExpectedMSQFault(new TooManyInputFilesFault(numFiles, maxInputFilesPerWorker, 10)) + .verifyResults(); + } + + @Test + public void testTooManyPartitionsWithLowContextLimit() throws IOException + { + final int maxPartitions = 5; + + final Map context = + ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put(MultiStageQueryContext.CTX_ROWS_PER_SEGMENT, 1) + .put(MultiStageQueryContext.CTX_MAX_PARTITIONS, maxPartitions) + .build(); + + final RowSignature rowSignature = RowSignature.builder().addTimeColumn().build(); + + // Create a file with enough rows to exceed the partition limit + final File file = createNdJsonFile(newTempFile("ndjson"), 100, 1); + final String filePathAsJson = queryFramework().queryJsonMapper().writeValueAsString(file.getAbsolutePath()); + + testIngestQuery().setSql(" insert into foo1 SELECT\n" + + " floor(TIME_PARSE(\"timestamp\") to day) AS __time\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{ \"files\": [" + filePathAsJson + "],\"type\":\"local\"}',\n" + + " '{\"type\": \"json\"}',\n" + + " '[{\"name\": \"timestamp\",\"type\":\"string\"}]'\n" + + " )\n" + + ") PARTITIONED by day") + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .setQueryContext(context) + .setExpectedMSQFault(new TooManyPartitionsFault(maxPartitions)) + .verifyResults(); + } + @Test public void testUnionAllWithDifferentColumnNames() { diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java index 73b7247728eb..b51f229a6695 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/StageDefinitionTest.java @@ -27,6 +27,7 @@ import org.apache.druid.frame.key.KeyColumn; import org.apache.druid.frame.key.KeyOrder; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.querykit.common.OffsetLimitStageProcessor; import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl; @@ -60,7 +61,10 @@ public void testGeneratePartitionsForNullShuffle() false ); - Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionBoundariesForShuffle(null)); + Assert.assertThrows( + ISE.class, + () -> stageDefinition.generatePartitionBoundariesForShuffle(null, Limits.DEFAULT_MAX_PARTITIONS) + ); } @Test @@ -82,7 +86,10 @@ public void testGeneratePartitionsForNonNullShuffleWithNullCollector() false ); - Assert.assertThrows(ISE.class, () -> stageDefinition.generatePartitionBoundariesForShuffle(null)); + Assert.assertThrows( + ISE.class, + () -> stageDefinition.generatePartitionBoundariesForShuffle(null, Limits.DEFAULT_MAX_PARTITIONS) + ); } @Test @@ -117,7 +124,8 @@ public void testGeneratePartitionsForNonNullShuffleWithNonNullCollector() 100, false, false - ) + ), + Limits.DEFAULT_MAX_PARTITIONS ) ); } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java index 9efdcffdac98..96312e293faa 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/BaseControllerQueryKernelTest.java @@ -139,7 +139,7 @@ public ControllerQueryKernelTester setupStage( break; case READING_INPUT: - controllerQueryKernel.createWorkOrders(stageId.getStageNumber(), null); + controllerQueryKernel.createWorkOrders(stageId.getStageNumber(), Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, null); controllerQueryKernel.startStage(stageId); for (int i = 0; i < queryDefinition.getStageDefinition(stageId).getMaxWorkerCount(); ++i) { controllerQueryKernel.workOrdersSentForWorker(stageId, i); @@ -248,7 +248,9 @@ private Set createAndGetNewStageNumbers(boolean checkInitialized) inputSlicerFactory, WorkerAssignmentStrategy.MAX, MultiStageQueryContext.getRowBasedFrameType(QueryContext.of(config.getWorkerContextMap())), - Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, + Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER, + Limits.DEFAULT_MAX_PARTITIONS ) ); } @@ -280,7 +282,7 @@ public boolean isSuccess() public void startStage(int stageNumber) { Preconditions.checkArgument(initialized); - controllerQueryKernel.createWorkOrders(stageNumber, null); + controllerQueryKernel.createWorkOrders(stageNumber, Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, null); controllerQueryKernel.startStage(new StageId(queryDefinition.getQueryId(), stageNumber)); } diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java index c3e42824bece..43f3c5c5635c 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/kernel/controller/WorkerInputsTest.java @@ -79,6 +79,7 @@ public void test_max_threeInputs_fourWorkers() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(denseWorkers(4), true), WorkerAssignmentStrategy.MAX, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -108,6 +109,7 @@ public void test_max_threeInputs_fourWorkers_withGaps() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(new IntAVLTreeSet(new int[]{1, 3, 4, 5}), true), WorkerAssignmentStrategy.MAX, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -137,6 +139,7 @@ public void test_max_zeroInputs_fourWorkers() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(denseWorkers(4), true), WorkerAssignmentStrategy.MAX, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -166,6 +169,7 @@ public void test_auto_zeroInputSpecs_fourWorkers() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(denseWorkers(4), true), WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -192,6 +196,7 @@ public void test_auto_zeroInputSlices_fourWorkers() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(denseWorkers(4), true), WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -219,6 +224,7 @@ public void test_auto_zeroInputSlices_broadcast_fourWorkers() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(denseWorkers(4), true), WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -245,6 +251,7 @@ public void test_auto_threeInputs_fourWorkers() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(denseWorkers(4), true), WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -274,6 +281,7 @@ public void test_auto_oneInputStageWithThreePartitionsAndTwoWorkers_fourWorkerMa new Int2ObjectAVLTreeMap<>(ImmutableMap.of(0, OutputChannelMode.LOCAL_STORAGE)) ), WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -322,6 +330,7 @@ public void test_auto_oneInputStageWithThreePartitionsAndTwoWorkers_oneWorkerMax new Int2ObjectAVLTreeMap<>(ImmutableMap.of(0, OutputChannelMode.LOCAL_STORAGE)) ), WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -357,6 +366,7 @@ public void test_auto_threeBigInputs_fourWorkers() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(denseWorkers(4), true), WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -384,6 +394,7 @@ public void test_auto_tenSmallAndOneBigInputs_twoWorkers() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(denseWorkers(2), true), WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -417,6 +428,7 @@ public void test_auto_threeBigInputs_oneWorker() Int2IntMaps.EMPTY_MAP, new TestInputSpecSlicer(denseWorkers(1), true), WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -449,6 +461,7 @@ public void test_max_shouldAlwaysSplitStatic() Int2IntMaps.EMPTY_MAP, testInputSpecSlicer, WorkerAssignmentStrategy.MAX, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); @@ -493,6 +506,7 @@ public void test_auto_shouldSplitDynamicIfPossible() Int2IntMaps.EMPTY_MAP, testInputSpecSlicer, WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, 100 ); @@ -536,6 +550,7 @@ public void test_auto_shouldUseLeastWorkersPossible() Int2IntMaps.EMPTY_MAP, testInputSpecSlicer, WorkerAssignmentStrategy.AUTO, + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, Limits.DEFAULT_MAX_INPUT_BYTES_PER_WORKER ); diff --git a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java index 5838c03dd819..569329532606 100644 --- a/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java +++ b/multi-stage-query/src/test/java/org/apache/druid/msq/util/MultiStageQueryContextTest.java @@ -22,8 +22,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Tasks; +import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.exec.WorkerMemoryParameters; import org.apache.druid.msq.indexing.destination.MSQSelectDestination; import org.apache.druid.msq.kernel.WorkerAssignmentStrategy; @@ -116,7 +118,80 @@ public void getMaxInputBytesPerWorker_set_returnsCorrectValue() Assert.assertEquals( 1024, - MultiStageQueryContext.getMaxInputBytesPerWorker(QueryContext.of(propertyMap))); + MultiStageQueryContext.getMaxInputBytesPerWorker(QueryContext.of(propertyMap)) + ); + } + + @Test + public void getMaxInputFilesPerWorker_unset_returnsDefaultValue() + { + Assert.assertEquals( + Limits.DEFAULT_MAX_INPUT_FILES_PER_WORKER, + MultiStageQueryContext.getMaxInputFilesPerWorker(QueryContext.empty()) + ); + } + + @Test + public void getMaxInputFilesPerWorker_set_returnsCorrectValue() + { + Map propertyMap = ImmutableMap.of(MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER, 5000); + Assert.assertEquals(5000, MultiStageQueryContext.getMaxInputFilesPerWorker(QueryContext.of(propertyMap))); + } + + @Test + public void getMaxInputFilesPerWorker_zero_throwsException() + { + Map propertyMap = ImmutableMap.of(MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER, 0); + Assert.assertThrows( + DruidException.class, + () -> MultiStageQueryContext.getMaxInputFilesPerWorker(QueryContext.of(propertyMap)) + ); + } + + @Test + public void getMaxInputFilesPerWorker_negative_throwsException() + { + Map propertyMap = ImmutableMap.of(MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER, -1); + Assert.assertThrows( + DruidException.class, + () -> MultiStageQueryContext.getMaxInputFilesPerWorker(QueryContext.of(propertyMap)) + ); + } + + @Test + public void getMaxPartitions_unset_returnsDefaultValue() + { + Assert.assertEquals( + Limits.DEFAULT_MAX_PARTITIONS, + MultiStageQueryContext.getMaxPartitions(QueryContext.empty()) + ); + } + + @Test + public void getMaxPartitions_set_returnsCorrectValue() + { + Map propertyMap = ImmutableMap.of(MultiStageQueryContext.CTX_MAX_PARTITIONS, 50000); + Assert.assertEquals(50000, MultiStageQueryContext.getMaxPartitions(QueryContext.of(propertyMap))); + } + + @Test + public void getMaxPartitions_zero_throwsException() + { + Map propertyMap = ImmutableMap.of(MultiStageQueryContext.CTX_MAX_PARTITIONS, 0); + Assert.assertThrows( + DruidException.class, + () -> MultiStageQueryContext.getMaxPartitions(QueryContext.of(propertyMap)) + ); + } + + @Test + public void getMaxPartitions_negative_throwsException() + { + Map propertyMap = ImmutableMap.of(MultiStageQueryContext.CTX_MAX_PARTITIONS, -1); + Assert.assertThrows( + DruidException.class, + () -> MultiStageQueryContext.getMaxPartitions(QueryContext.of(propertyMap)) + ); } @Test @@ -206,7 +281,10 @@ public void getMSQMode_set_returnsCorrectValue() @Test public void getSelectDestination_unset_returnsDefaultValue() { - Assert.assertEquals(MSQSelectDestination.TASKREPORT, MultiStageQueryContext.getSelectDestination(QueryContext.empty())); + Assert.assertEquals( + MSQSelectDestination.TASKREPORT, + MultiStageQueryContext.getSelectDestination(QueryContext.empty()) + ); } @Test