Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ The following table lists the context parameters for the MSQ task engine:
| `removeNullBytes` | SELECT, INSERT or REPLACE<br /><br /> The MSQ engine cannot process null bytes in strings and throws `InvalidNullByteFault` if it encounters them in the source data. If the parameter is set to true, The MSQ engine will remove the null bytes in string fields when reading the data. | `false` |
| `includeAllCounters` | SELECT, INSERT or REPLACE<br /><br />Whether to include counters that were added in Druid 31 or later. This is a backwards compatibility option that must be set to `false` during a rolling update from versions prior to Druid 31. | `true` |
| `maxFrameSize` | SELECT, INSERT or REPLACE<br /><br />Size of frames used for data transfer within the MSQ engine. You generally do not need to change this unless you have very large rows. | `1000000` (1 MB) |
| `maxInputFilesPerWorker` | SELECT, INSERT, REPLACE<br /><br />Maximum number of input files or segments per worker. If a single worker would need to read more than this number of files, the query fails with a `TooManyInputFiles` error. In this case, you should either increase this limit if your tasks have enough memory to handle more files, add more workers by increasing `maxNumTasks`, or split your query into smaller queries that process fewer files. | 10,000 |
| `maxPartitions` | SELECT, INSERT, REPLACE<br /><br />Maximum number of output partitions for any single stage. For INSERT or REPLACE queries, this controls the maximum number of segments that can be generated. If the query would exceed this limit, it fails with a `TooManyPartitions` error. You can increase this limit if needed, break your query into smaller queries, or use a larger target segment size (via `rowsPerSegment`). | 25,000 |
| `maxThreads` | SELECT, INSERT or REPLACE<br /><br />Maximum number of threads to use for processing. This only has an effect if it is greater than zero and less than the default thread count based on system configuration. Otherwise, it is ignored, and workers use the default thread count. | Not set (use default thread count) |

## Joins
Expand Down Expand Up @@ -570,10 +572,10 @@ The following table lists query limits:

| Limit | Value | Error if exceeded |
|---|---|---|
| Size of an individual row written to a frame. Row size when written to a frame may differ from the original row size. | 1 MB | `RowTooLarge` |
| Size of an individual row written to a frame. Row size when written to a frame may differ from the original row size. Configurable with [`maxFrameSize`](#context). | 1 MB | `RowTooLarge` |
| Number of segment-granular time chunks encountered during ingestion. | 5,000 | `TooManyBuckets`|
| Number of input files/segments per worker. | 10,000 | `TooManyInputFiles`|
| Number of output partitions for any one stage. Number of segments generated during ingestion. |25,000 | `TooManyPartitions`|
| Number of input files/segments per worker. Configurable with [`maxInputFilesPerWorker`](#context). | 10,000 | `TooManyInputFiles`|
| Number of output partitions for any one stage. Number of segments generated during ingestion. Configurable with [`maxPartitions`](#context). | 25,000 | `TooManyPartitions`|
| Number of output columns for any one stage. | 2,000 | `TooManyColumns`|
| Number of cluster by columns that can appear in a stage | 1,500 | `TooManyClusteredByColumns` |
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | `TooManyWorkers`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,7 @@ private void startWorkForStage(
final QueryDefinition queryDef,
final ControllerQueryKernel queryKernel,
final int stageNumber,
final int maxInputFilesPerWorker,
@Nullable final List<SegmentIdWithShardSpec> segmentsToGenerate
)
{
Expand All @@ -1454,7 +1455,8 @@ private void startWorkForStage(
segmentsToGenerate
);

final Int2ObjectMap<WorkOrder> workOrders = queryKernel.createWorkOrders(stageNumber, extraInfos);
final Int2ObjectMap<WorkOrder> workOrders =
queryKernel.createWorkOrders(stageNumber, maxInputFilesPerWorker, extraInfos);
final StageId stageId = new StageId(queryDef.getQueryId(), stageNumber);

queryKernel.startStage(stageId);
Expand Down Expand Up @@ -2533,8 +2535,12 @@ private void submitSequentialMergeFetchRequests(StageId stageId, Set<String> tas
*/
private void startStages() throws IOException, InterruptedException
{
final int maxInputFilesPerWorker =
MultiStageQueryContext.getMaxInputFilesPerWorker(querySpec.getContext());
final long maxInputBytesPerWorker =
MultiStageQueryContext.getMaxInputBytesPerWorker(querySpec.getContext());
final int maxPartitions =
MultiStageQueryContext.getMaxPartitions(querySpec.getContext());

logKernelStatus(queryDef.getQueryId(), queryKernel);

Expand All @@ -2545,7 +2551,9 @@ private void startStages() throws IOException, InterruptedException
inputSpecSlicerFactory,
querySpec.getAssignmentStrategy(),
rowBasedFrameType,
maxInputBytesPerWorker
maxInputFilesPerWorker,
maxInputBytesPerWorker,
maxPartitions
);

for (final StageId stageId : newStageIds) {
Expand Down Expand Up @@ -2573,7 +2581,13 @@ private void startStages() throws IOException, InterruptedException
retryWorkersOrFailJob(queryKernel, workerFaultSet);
}
stageRuntimesForLiveReports.put(stageId.getStageNumber(), new Interval(DateTimes.nowUtc(), DateTimes.MAX));
startWorkForStage(queryDef, queryKernel, stageId.getStageNumber(), segmentsToGenerate);
startWorkForStage(
queryDef,
queryKernel,
stageId.getStageNumber(),
maxInputFilesPerWorker,
segmentsToGenerate
);
}
} while (!newStageIds.isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

package org.apache.druid.msq.exec;

import org.apache.druid.msq.indexing.error.TooManyBucketsFault;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.TooManyPartitionsFault;
import org.apache.druid.msq.util.MultiStageQueryContext;

public class Limits
{
/**
Expand All @@ -45,9 +50,10 @@ public class Limits
public static final int MAX_WORKERS = 1000;

/**
* Maximum number of input files per worker
* Default maximum number of input files per worker. Exceeding this will yield a {@link TooManyInputFilesFault}.
* Can be overridden by the context parameter {@link MultiStageQueryContext#CTX_MAX_INPUT_FILES_PER_WORKER}.
*/
public static final int MAX_INPUT_FILES_PER_WORKER = 10_000;
public static final int DEFAULT_MAX_INPUT_FILES_PER_WORKER = 10_000;

/**
* Maximum number of parse exceptions with their stack traces a worker can send to the controller.
Expand Down Expand Up @@ -94,10 +100,18 @@ public class Limits
public static final long MAX_SELECT_RESULT_ROWS = 3_000;

/**
* Max number of partition buckets for ingestion queries.
* Max number of partition buckets. Exceeding this will yield a {@link TooManyBucketsFault}. For an ingestion job,
* this is the maximum number of output time chunks.
*/
public static final int MAX_PARTITION_BUCKETS = 5_000;

/**
* Default max number of output partitions for a stage. Exceeding this will yield a {@link TooManyPartitionsFault}.
* For an ingestion job, this is the maximum number of segments that can be created.
* Can be overridden by the context parameter {@link MultiStageQueryContext#CTX_MAX_PARTITIONS}.
*/
public static final int DEFAULT_MAX_PARTITIONS = 25_000;

/**
* Max number of rows with the same key in a window. This acts as a guardrail for
* data distribution with high cardinality
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.druid.msq.input.stage.StageInputSliceReader;
import org.apache.druid.msq.input.table.SegmentsInputSlice;
import org.apache.druid.msq.input.table.SegmentsInputSliceReader;
import org.apache.druid.msq.kernel.ShuffleKind;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.shuffle.output.DurableStorageOutputChannelFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
Expand Down Expand Up @@ -174,7 +173,6 @@ public void startAsync()

try {
exec.registerCancellationId(cancellationId);
initGlobalSortPartitionBoundariesIfNeeded();
startStageProcessor();
setUpCompletionCallbacks();
}
Expand Down Expand Up @@ -284,25 +282,6 @@ private void startStageProcessor()
stageResultFuture = processor.execute(executionContext);
}

/**
* Initialize {@link #stagePartitionBoundariesFuture} if it will be needed (i.e. if {@link ShuffleKind#GLOBAL_SORT})
* but does not need statistics. In this case, it is known upfront, before the job starts.
*/
private void initGlobalSortPartitionBoundariesIfNeeded()
{
if (workOrder.getStageDefinition().doesShuffle()
&& workOrder.getStageDefinition().getShuffleSpec().kind() == ShuffleKind.GLOBAL_SORT
&& !workOrder.getStageDefinition().mustGatherResultKeyStatistics()) {
// Result key stats aren't needed, so the partition boundaries are knowable ahead of time. Compute them now.
final ClusterByPartitions boundaries =
workOrder.getStageDefinition()
.generatePartitionBoundariesForShuffle(null)
.valueOrThrow();

stagePartitionBoundariesFuture.set(boundaries);
}
}

/**
* Callbacks that fire when all work for the stage is done (i.e. when {@link #stageResultFuture} resolves).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,11 @@ private void handleNewWorkOrder(
// Set up processorCloser (called when processing is done).
kernelHolder.processorCloser.register(() -> runWorkOrder.stop(null));

// Set resultPartitionBoundaries in RunWorkOrder if it is known ahead of time.
if (kernel.hasResultPartitionBoundaries()) {
runWorkOrder.getStagePartitionBoundariesFuture().set(kernel.getResultPartitionBoundaries());
}

// Start working on this stage immediately.
kernel.startReading();
runWorkOrder.startAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ public TooManyInputFilesFault(
super(
CODE,
"Too many input files/segments [%d] encountered. Maximum input files/segments per worker is set to [%d]. Try"
+ " breaking your query up into smaller queries, or increasing the number of workers to at least [%d] by"
+ " setting %s in your query context",
+ " increasing the limit using the %s query context parameter, breaking your query up into smaller queries,"
+ " or increasing the number of workers to at least [%d] by setting %s in your query context.",
numInputFiles,
maxInputFiles,
MultiStageQueryContext.CTX_MAX_INPUT_FILES_PER_WORKER,
minNumWorkers,
MultiStageQueryContext.CTX_MAX_NUM_TASKS
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.msq.util.MultiStageQueryContext;

import java.util.Objects;

Expand All @@ -37,9 +38,10 @@ public TooManyPartitionsFault(@JsonProperty("maxPartitions") final int maxPartit
{
super(
CODE,
"Too many partitions (max = %d); try breaking your query up into smaller queries or "
+ "using a larger target size",
maxPartitions
"Too many partitions (max = %d). Try increasing the limit using the %s query context parameter, "
+ "breaking your query up into smaller queries, or using a larger target size.",
maxPartitions,
MultiStageQueryContext.CTX_MAX_PARTITIONS
);
this.maxPartitions = maxPartitions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollector;
import org.apache.druid.msq.statistics.ClusterByStatisticsCollectorImpl;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -85,8 +87,6 @@
*/
public class StageDefinition
{
private static final int MAX_PARTITIONS = 25_000; // Limit for TooManyPartitions

// If adding any fields here, add them to builder(StageDefinition) below too.
private final StageId id;
private final List<InputSpec> inputSpecs;
Expand Down Expand Up @@ -292,12 +292,6 @@ public boolean getShuffleCheckHasMultipleValues()
return shuffleCheckHasMultipleValues;
}

public int getMaxPartitionCount()
{
// Pretends to be an instance method, but really returns a constant. Maybe one day this will be defined per stage.
return MAX_PARTITIONS;
}

public int getStageNumber()
{
return id.getStageNumber();
Expand Down Expand Up @@ -334,8 +328,19 @@ public boolean mustGatherResultKeyStatistics()
return mustGatherResultKeyStatistics(shuffleSpec);
}

/**
* Generate partition boundaries for {@link ShuffleKind#GLOBAL_SORT} shuffles.
*
* @param collector statistics collector, to be provided if {@link #mustGatherResultKeyStatistics()}
* @param maxPartitions maximum number of partitions to generate. On the controller, this is the value of
* {@link MultiStageQueryContext#getMaxPartitions(QueryContext)}. On workers, this method
* is only used when the number of partitions is determined ahead of time by the
* {@link ShuffleSpec}, so {@link Integer#MAX_VALUE} is typically provided for this parameter
* out of convenience.
*/
public Either<Long, ClusterByPartitions> generatePartitionBoundariesForShuffle(
@Nullable ClusterByStatisticsCollector collector
@Nullable ClusterByStatisticsCollector collector,
int maxPartitions
)
{
if (shuffleSpec == null) {
Expand All @@ -351,7 +356,7 @@ public Either<Long, ClusterByPartitions> generatePartitionBoundariesForShuffle(
} else if (!mustGatherResultKeyStatistics() && collector != null) {
throw new ISE("Statistics gathered, but not required for stage[%d]", getStageNumber());
} else {
return ((GlobalSortShuffleSpec) shuffleSpec).generatePartitionsForGlobalSort(collector, MAX_PARTITIONS);
return ((GlobalSortShuffleSpec) shuffleSpec).generatePartitionsForGlobalSort(collector, maxPartitions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import it.unimi.dsi.fastutil.ints.Int2IntMap;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.input.InputSlice;
import org.apache.druid.msq.input.InputSpec;
import org.apache.druid.msq.input.InputSpecSlicer;
Expand All @@ -49,6 +48,7 @@ public List<InputSlice> assign(
final InputSpec inputSpec,
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
final int maxInputFilesPerSlice,
final long maxInputBytesPerSlice
)
{
Expand All @@ -58,7 +58,7 @@ public List<InputSlice> assign(

/**
* Use the lowest possible number of workers, while keeping each worker's workload under
* {@link Limits#MAX_INPUT_FILES_PER_WORKER} files and {@code maxInputBytesPerWorker} bytes.
* {@code maxInputFilesPerSlice} files and {@code maxInputBytesPerWorker} bytes.
*
* Implemented using {@link InputSpecSlicer#sliceDynamic} whenever possible.
*/
Expand All @@ -69,14 +69,15 @@ public List<InputSlice> assign(
final InputSpec inputSpec,
final Int2IntMap stageWorkerCountMap,
final InputSpecSlicer slicer,
final int maxInputFilesPerSlice,
final long maxInputBytesPerSlice
)
{
if (slicer.canSliceDynamic(inputSpec)) {
return slicer.sliceDynamic(
inputSpec,
stageDef.getMaxWorkerCount(),
Limits.MAX_INPUT_FILES_PER_WORKER,
maxInputFilesPerSlice,
maxInputBytesPerSlice
);
} else {
Expand Down Expand Up @@ -117,6 +118,7 @@ public String toString()
* @param inputSpec inputSpec containing information on where the input is read from
* @param stageWorkerCountMap map of past stage number vs number of worker inputs
* @param slicer creates slices of input spec based on other parameters
* @param maxInputFilesPerSlice hard maximum number of files per input slice
* @param maxInputBytesPerSlice maximum suggested bytes per input slice
* @return list containing input slices
*/
Expand All @@ -125,6 +127,7 @@ public abstract List<InputSlice> assign(
InputSpec inputSpec,
Int2IntMap stageWorkerCountMap,
InputSpecSlicer slicer,
int maxInputFilesPerSlice,
long maxInputBytesPerSlice
);
}
Loading
Loading