Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ The following table lists the context parameters for the MSQ task engine:
| `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | `true` |
| `arrayIngestMode` | INSERT, REPLACE<br /><br /> Controls how ARRAY type values are stored in Druid segments. When set to `array` (recommended for SQL compliance), Druid will store all ARRAY typed values in [ARRAY typed columns](../querying/arrays.md), and supports storing both VARCHAR and numeric typed arrays. When set to `mvd` (the default, for backwards compatibility), Druid only supports VARCHAR typed arrays, and will store them as [multi-value string columns](../querying/multi-value-dimensions.md). See [`arrayIngestMode`] in the [Arrays](../querying/arrays.md) page for more details. | `mvd` (for backwards compatibility, recommended to use `array` for SQL compliance)|
| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. This is a hint to the MSQ engine and the actual joins in the query may proceed in a different way than specified. See [Joins](#joins) for more details. | `broadcast` |
| `rowsInMemory` | INSERT or REPLACE<br /><br />Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
| `maxRowsInMemory` | INSERT or REPLACE<br /><br />Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
| `rowsInMemory` | INSERT or REPLACE<br /><br />Alternate spelling of `maxRowsInMemory`. Ignored if `maxRowsInMemory` is set. | 100,000 |
| `segmentSortOrder` | INSERT or REPLACE<br /><br />Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid uses the order from this context parameter instead. Provide the column list as comma-separated values or as a JSON array in string form.<br />< br/>For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city,country`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list |
| `forceSegmentSortByTime` | INSERT or REPLACE<br /><br />When set to `true` (the default), Druid prepends `__time` to [CLUSTERED BY](#clustered-by) when determining the sort order for individual segments. Druid also requires that `segmentSortOrder`, if provided, starts with `__time`.<br /><br />When set to `false`, Druid uses the [CLUSTERED BY](#clustered-by) alone to determine the sort order for individual segments, and does not require that `segmentSortOrder` begin with `__time`. Setting this parameter to `false` is an experimental feature; see [Sorting](../ingestion/partitioning.md#sorting) for details. | `true` |
| `maxParseExceptions`| SELECT, INSERT, REPLACE<br /><br />Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1. | 0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ private static MSQTuningConfig buildMSQTuningConfig(CompactionTask compactionTas

// We don't consider maxRowsInMemory coming via CompactionTuningConfig since it always sets a default value if no
// value specified by user.
final int maxRowsInMemory = MultiStageQueryContext.getRowsInMemory(compactionTaskContext);
final int maxRowsInMemory = MultiStageQueryContext.getMaxRowsInMemory(compactionTaskContext);
final Integer maxNumSegments = MultiStageQueryContext.getMaxNumSegments(compactionTaskContext);

Integer rowsPerSegment = getRowsPerSegment(compactionTask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ private static MSQTuningConfig makeMSQTuningConfig(final PlannerContext plannerC
// This parameter is used internally for the number of worker tasks only, so we subtract 1
final int maxNumWorkers = maxNumTasks - 1;
final int rowsPerSegment = MultiStageQueryContext.getRowsPerSegment(sqlQueryContext);
final int maxRowsInMemory = MultiStageQueryContext.getRowsInMemory(sqlQueryContext);
final int maxRowsInMemory = MultiStageQueryContext.getMaxRowsInMemory(sqlQueryContext);
final Integer maxNumSegments = MultiStageQueryContext.getMaxNumSegments(sqlQueryContext);
final IndexSpec indexSpec = MultiStageQueryContext.getIndexSpec(sqlQueryContext, plannerContext.getJsonMapper());
MSQTuningConfig tuningConfig = new MSQTuningConfig(maxNumWorkers, maxRowsInMemory, rowsPerSegment, maxNumSegments, indexSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,22 @@ public class MultiStageQueryContext
public static final String CTX_REMOVE_NULL_BYTES = "removeNullBytes";
public static final boolean DEFAULT_REMOVE_NULL_BYTES = false;

public static final String CTX_ROWS_IN_MEMORY = "rowsInMemory";
// Lower than the default to minimize the impact of per-row overheads that are not accounted for by
// OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist.
public static final int DEFAULT_ROWS_IN_MEMORY = 100000;
/**
* Used by {@link #getMaxRowsInMemory(QueryContext)}.
*/
static final String CTX_MAX_ROWS_IN_MEMORY = "maxRowsInMemory";

/**
* Used by {@link #getMaxRowsInMemory(QueryContext)}. Alternate spelling of {@link #CTX_MAX_ROWS_IN_MEMORY}.
* Ignored if {@link #CTX_MAX_ROWS_IN_MEMORY} is set.
*/
static final String CTX_ROWS_IN_MEMORY = "rowsInMemory";

/**
* Lower than the default to minimize the impact of per-row overheads that are not accounted for by
* OnheapIncrementalIndex. For example: overheads related to creating bitmaps during persist.
*/
public static final int DEFAULT_MAX_ROWS_IN_MEMORY = 100000;

public static final String CTX_IS_REINDEX = "isReindex";

Expand Down Expand Up @@ -413,9 +425,14 @@ public static MSQSelectDestination getSelectDestination(final QueryContext query
return destination;
}

public static int getRowsInMemory(final QueryContext queryContext)
public static int getMaxRowsInMemory(final QueryContext queryContext)
{
return queryContext.getInt(CTX_ROWS_IN_MEMORY, DEFAULT_ROWS_IN_MEMORY);
Integer ctxValue = queryContext.getInt(CTX_MAX_ROWS_IN_MEMORY);
if (ctxValue == null) {
ctxValue = queryContext.getInt(CTX_ROWS_IN_MEMORY);
}

return ctxValue != null ? ctxValue : DEFAULT_MAX_ROWS_IN_MEMORY;
}

public static Integer getMaxNumSegments(final QueryContext queryContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ private static MSQTuningConfig getExpectedTuningConfig()
{
return new MSQTuningConfig(
1,
MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
MultiStageQueryContext.DEFAULT_MAX_ROWS_IN_MEMORY,
MAX_ROWS_PER_SEGMENT,
null,
createIndexSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_FINALIZE_AGGREGATIONS;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_FRAME_SIZE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_NUM_TASKS;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_ROWS_IN_MEMORY;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MAX_THREADS;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_MSQ_MODE;
import static org.apache.druid.msq.util.MultiStageQueryContext.CTX_REMOVE_NULL_BYTES;
Expand Down Expand Up @@ -159,19 +160,33 @@ public void getRowsPerSegment_set_returnsCorrectValue()
}

@Test
public void getRowsInMemory_unset_returnsDefaultValue()
public void getMaxRowsInMemory_unset_returnsDefaultValue()
{
Assert.assertEquals(
MultiStageQueryContext.DEFAULT_ROWS_IN_MEMORY,
MultiStageQueryContext.getRowsInMemory(QueryContext.empty())
MultiStageQueryContext.DEFAULT_MAX_ROWS_IN_MEMORY,
MultiStageQueryContext.getMaxRowsInMemory(QueryContext.empty())
);
}

@Test
public void getRowsInMemory_set_returnsCorrectValue()
public void getMaxRowsInMemory_set_returnsCorrectValue()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_ROWS_IN_MEMORY, 10);
Assert.assertEquals(10, MultiStageQueryContext.getRowsInMemory(QueryContext.of(propertyMap)));
Map<String, Object> propertyMap = ImmutableMap.of(CTX_MAX_ROWS_IN_MEMORY, 10);
Assert.assertEquals(10, MultiStageQueryContext.getMaxRowsInMemory(QueryContext.of(propertyMap)));
}

@Test
public void getMaxRowsInMemory_altSet_returnsCorrectValue()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_ROWS_IN_MEMORY, 20);
Assert.assertEquals(20, MultiStageQueryContext.getMaxRowsInMemory(QueryContext.of(propertyMap)));
}

@Test
public void getMaxRowsInMemory_bothSet_returnsCorrectValue()
{
Map<String, Object> propertyMap = ImmutableMap.of(CTX_ROWS_IN_MEMORY, 20, CTX_MAX_ROWS_IN_MEMORY, 10);
Assert.assertEquals(10, MultiStageQueryContext.getMaxRowsInMemory(QueryContext.of(propertyMap)));
}

@Test
Expand Down
Loading