Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
Expand Down Expand Up @@ -239,7 +240,12 @@ private static <T> List<T> runQuery(QueryRunnerFactory factory, QueryRunner runn
);

final QueryPlus<T> queryToRun = QueryPlus.wrap(
query.withOverriddenContext(ImmutableMap.of("vectorize", vectorize))
query.withOverriddenContext(
ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, vectorize,
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
)
)
);
Sequence<T> queryResult = theRunner.run(queryToRun, ResponseContext.createEmpty());
return queryResult.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
Expand Down Expand Up @@ -434,7 +435,10 @@ public void tearDown() throws Exception
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void querySql(Blackhole blackhole) throws Exception
{
final Map<String, Object> context = ImmutableMap.of("vectorize", vectorize);
final Map<String, Object> context = ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, vectorize,
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
);
final AuthenticationResult authenticationResult = NoopEscalator.getInstance()
.createEscalatedAuthenticationResult();
try (final DruidPlanner planner = plannerFactory.createPlanner(context, ImmutableList.of(), authenticationResult)) {
Expand All @@ -450,7 +454,10 @@ public void querySql(Blackhole blackhole) throws Exception
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void planSql(Blackhole blackhole) throws Exception
{
final Map<String, Object> context = ImmutableMap.of("vectorize", vectorize);
final Map<String, Object> context = ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, vectorize,
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
);
final AuthenticationResult authenticationResult = NoopEscalator.getInstance()
.createEscalatedAuthenticationResult();
try (final DruidPlanner planner = plannerFactory.createPlanner(context, ImmutableList.of(), authenticationResult)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
Expand Down Expand Up @@ -285,7 +286,10 @@ public void tearDown() throws Exception
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public void querySql(Blackhole blackhole) throws Exception
{
final Map<String, Object> context = ImmutableMap.of("vectorize", vectorize);
final Map<String, Object> context = ImmutableMap.of(
QueryContexts.VECTORIZE_KEY, vectorize,
QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize
);
final AuthenticationResult authenticationResult = NoopEscalator.getInstance()
.createEscalatedAuthenticationResult();
try (final DruidPlanner planner = plannerFactory.createPlanner(context, ImmutableList.of(), authenticationResult)) {
Expand Down
15 changes: 14 additions & 1 deletion docs/misc/math-expr.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ See javadoc of java.lang.Math for detailed explanation for each function.
| all(lambda,arr) | returns 1 if all elements in the array matches the lambda expression, else 0 |


### Reduction functions
## Reduction functions

Reduction functions operate on zero or more expressions and return a single expression. If no expressions are passed as
arguments, then the result is `NULL`. The expressions must all be convertible to a common data type, which will be the
Expand All @@ -214,3 +214,16 @@ For the IPv4 address functions, the `address` argument can either be an IPv4 dot
| ipv4_match(address, subnet) | Returns 1 if the `address` belongs to the `subnet` literal, else 0. If `address` is not a valid IPv4 address, then 0 is returned. This function is more efficient if `address` is a long instead of a string.|
| ipv4_parse(address) | Parses `address` into an IPv4 address stored as a long. If `address` is a long that is a valid IPv4 address, then it is passed through. Returns null if `address` cannot be represented as an IPv4 address. |
| ipv4_stringify(address) | Converts `address` into an IPv4 address dotted-decimal string. If `address` is a string that is a valid IPv4 address, then it is passed through. Returns null if `address` cannot be represented as an IPv4 address.|


## Vectorization Support
A number of expressions support ['vectorized' query engines](../querying/query-context.md#vectorization-parameters)

supported features:
* constants and identifiers are supported for any column type
* `cast` is supported for numeric and string types
* math operators: `+`,`-`,`*`,`/`,`%`,`^` are supported for numeric types
* comparison operators: `=`, `!=`, `>`, `>=`, `<`, `<=` are supported for numeric types
* math functions: `abs`, `acos`, `asin`, `atan`, `cbrt`, `ceil`, `cos`, `cosh`, `cot`, `exp`, `expm1`, `floor`, `getExponent`, `log`, `log10`, `log1p`, `nextUp`, `rint`, `signum`, `sin`, `sinh`, `sqrt`, `tan`, `tanh`, `toDegrees`, `toRadians`, `ulp`, `atan2`, `copySign`, `div`, `hypot`, `max`, `min`, `nextAfter`, `pow`, `remainder`, `scalb` are supported for numeric types
* time functions: `timestamp_floor` (with constant granularity argument) is supported for numeric types
* other: `parse_long` is supported for numeric and string types
3 changes: 2 additions & 1 deletion docs/querying/query-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ include "selector", "bound", "in", "like", "regex", "search", "and", "or", and "
- All aggregators must offer vectorized implementations. These include "count", "doubleSum", "floatSum", "longSum", "longMin",
"longMax", "doubleMin", "doubleMax", "floatMin", "floatMax", "longAny", "doubleAny", "floatAny", "stringAny",
"hyperUnique", "filtered", "approxHistogram", "approxHistogramFold", and "fixedBucketsHistogram" (with numerical input).
- No virtual columns.
- All virtual columns must offer vectorized implementations. Currently for expression virtual columns, support for vectorization is decided on a per expression basis, depending on the type of input and the functions used by the expression. See the currently supported list in the [expression documentation](../misc/math-expr.md#vectorization-support).
- For GroupBy: All dimension specs must be "default" (no extraction functions or filtered dimension specs).
- For GroupBy: No multi-value dimensions.
- For Timeseries: No "descending" order.
Expand All @@ -107,3 +107,4 @@ vectorization. These query types will ignore the "vectorize" parameter even if i
|--------|-------|------------|
|vectorize|`true`|Enables or disables vectorized query execution. Possible values are `false` (disabled), `true` (enabled if possible, disabled otherwise, on a per-segment basis), and `force` (enabled, and groupBy or timeseries queries that cannot be vectorized will fail). The `"force"` setting is meant to aid in testing, and is not generally useful in production (since real-time segments can never be processed with vectorized execution, any queries on real-time data will fail). This will override `druid.query.default.context.vectorize` if it's set.|
|vectorSize|`512`|Sets the row batching size for a particular query. This will override `druid.query.default.context.vectorSize` if it's set.|
|vectorizeVirtualColumns|`false`|Enables or disables vectorized query processing of queries with virtual columns, layered on top of `vectorize` (`vectorize` must also be set to true for a query to utilize vectorization). Possible values are `false` (disabled), `true` (enabled if possible, disabled otherwise, on a per-segment basis), and `force` (enabled, and groupBy or timeseries queries with virtual columns that cannot be vectorized will fail). The `"force"` setting is meant to aid in testing, and is not generally useful in production. This will override `druid.query.default.context.vectorizeVirtualColumns` if it's set.|
12 changes: 12 additions & 0 deletions processing/src/main/java/org/apache/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class QueryContexts
public static final String BROKER_PARALLEL_MERGE_SMALL_BATCH_ROWS_KEY = "parallelMergeSmallBatchRows";
public static final String BROKER_PARALLELISM = "parallelMergeParallelism";
public static final String VECTORIZE_KEY = "vectorize";
public static final String VECTORIZE_VIRTUAL_COLUMNS_KEY = "vectorizeVirtualColumns";
public static final String VECTOR_SIZE_KEY = "vectorSize";
public static final String MAX_SUBQUERY_ROWS_KEY = "maxSubqueryRows";
public static final String JOIN_FILTER_PUSH_DOWN_KEY = "enableJoinFilterPushDown";
Expand All @@ -65,6 +66,7 @@ public class QueryContexts
public static final boolean DEFAULT_POPULATE_RESULTLEVEL_CACHE = true;
public static final boolean DEFAULT_USE_RESULTLEVEL_CACHE = true;
public static final Vectorize DEFAULT_VECTORIZE = Vectorize.TRUE;
public static final Vectorize DEFAULT_VECTORIZE_VIRTUAL_COLUMN = Vectorize.FALSE;
public static final int DEFAULT_PRIORITY = 0;
public static final int DEFAULT_UNCOVERED_INTERVALS_LIMIT = 0;
public static final long DEFAULT_TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(5);
Expand Down Expand Up @@ -197,6 +199,16 @@ public static <T> Vectorize getVectorize(Query<T> query, Vectorize defaultValue)
return parseEnum(query, VECTORIZE_KEY, Vectorize.class, defaultValue);
}

public static <T> Vectorize getVectorizeVirtualColumns(Query<T> query)
{
return getVectorizeVirtualColumns(query, QueryContexts.DEFAULT_VECTORIZE_VIRTUAL_COLUMN);
}

public static <T> Vectorize getVectorizeVirtualColumns(Query<T> query, Vectorize defaultValue)
{
return parseEnum(query, VECTORIZE_VIRTUAL_COLUMNS_KEY, Vectorize.class, defaultValue);
}

public static <T> int getVectorSize(Query<T> query)
{
return getVectorSize(query, QueryableIndexStorageAdapter.DEFAULT_VECTOR_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.query.groupby;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;

/**
Expand All @@ -44,7 +45,6 @@ public class GroupByQueryConfig
private static final String CTX_KEY_FORCE_HASH_AGGREGATION = "forceHashAggregation";
private static final String CTX_KEY_INTERMEDIATE_COMBINE_DEGREE = "intermediateCombineDegree";
private static final String CTX_KEY_NUM_PARALLEL_COMBINE_THREADS = "numParallelCombineThreads";
public static final String CTX_KEY_VECTORIZE = "vectorize";

@JsonProperty
private String defaultStrategy = GroupByStrategySelector.STRATEGY_V2;
Expand Down Expand Up @@ -243,7 +243,7 @@ public GroupByQueryConfig withOverrides(final GroupByQuery query)
CTX_KEY_NUM_PARALLEL_COMBINE_THREADS,
getNumParallelCombineThreads()
);
newConfig.vectorize = query.getContextBoolean(CTX_KEY_VECTORIZE, isVectorize());
newConfig.vectorize = query.getContextBoolean(QueryContexts.VECTORIZE_KEY, isVectorize());
return newConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.query.vector.VectorCursorGranularizer;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.filter.Filters;
Expand Down Expand Up @@ -79,6 +80,7 @@ public static boolean canVectorize(
return canVectorizeDimensions(capabilitiesFunction, query.getDimensions())
&& query.getDimensions().stream().allMatch(DimensionSpec::canVectorize)
&& query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(adapter))
&& VirtualColumns.shouldVectorize(query, query.getVirtualColumns(), adapter)
&& adapter.canVectorize(filter, query.getVirtualColumns(), false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.druid.query.vector.VectorCursorGranularizer;
import org.apache.druid.segment.SegmentMissingException;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.filter.Filters;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
import org.apache.druid.segment.vector.VectorCursor;
Expand Down Expand Up @@ -94,8 +95,9 @@ public Sequence<Result<TimeseriesResultValue>> process(final TimeseriesQuery que
final boolean descending = query.isDescending();

final boolean doVectorize = QueryContexts.getVectorize(query).shouldVectorize(
adapter.canVectorize(filter, query.getVirtualColumns(), descending)
&& query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(adapter))
query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(adapter))
&& VirtualColumns.shouldVectorize(query, query.getVirtualColumns(), adapter)
&& adapter.canVectorize(filter, query.getVirtualColumns(), descending)
);

final Sequence<Result<TimeseriesResultValue>> result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ public boolean canVectorize(
}

// vector cursors can't iterate backwards yet
return virtualColumns.canVectorize(this) && !descending;
return !descending;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.druid.java.util.common.Cacheable;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.column.BitmapIndex;
Expand Down Expand Up @@ -113,6 +115,15 @@ public static VirtualColumns nullToEmpty(@Nullable VirtualColumns virtualColumns
return virtualColumns == null ? EMPTY : virtualColumns;
}

public static boolean shouldVectorize(Query<?> query, VirtualColumns virtualColumns, ColumnInspector inspector)
{
if (virtualColumns.getVirtualColumns().length > 0) {
return QueryContexts.getVectorizeVirtualColumns(query).shouldVectorize(virtualColumns.canVectorize(inspector));
} else {
return true;
}
}

private VirtualColumns(
List<VirtualColumn> virtualColumns,
Map<String, VirtualColumn> withDotSupport,
Expand Down Expand Up @@ -473,5 +484,4 @@ public String toString()
{
return virtualColumns.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void testVectorAggretatorUsingGroupByQueryOnDoubleColumn(boolean doVector
.setAggregatorSpecs(
new DoubleMeanAggregatorFactory("meanOnDouble", SimpleTestIndex.DOUBLE_COL)
)
.setContext(Collections.singletonMap(GroupByQueryConfig.CTX_KEY_VECTORIZE, doVectorize))
.setContext(Collections.singletonMap(QueryContexts.VECTORIZE_KEY, doVectorize))
.build();

// do json serialization and deserialization of query to ensure there are no serde issues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10727,7 +10727,8 @@ private GroupByQuery.Builder makeQueryBuilder(final GroupByQuery query)
private Map<String, Object> makeContext()
{
return ImmutableMap.<String, Object>builder()
.put("vectorize", vectorize ? "force" : "false")
.put(QueryContexts.VECTORIZE_KEY, vectorize ? "force" : "false")
.put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ? "force" : "false")
.put("vectorSize", 16) // Small vector size to ensure we use more than one.
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Druids;
import org.apache.druid.query.FinalizeResultsQueryRunner;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
Expand Down Expand Up @@ -2942,8 +2943,9 @@ private Map<String, Object> makeContext()
private Map<String, Object> makeContext(final Map<String, Object> myContext)
{
final Map<String, Object> context = new HashMap<>();
context.put("vectorize", vectorize ? "force" : "false");
context.put("vectorSize", 16); // Small vector size to ensure we use more than one.
context.put(QueryContexts.VECTORIZE_KEY, vectorize ? "force" : "false");
context.put(QueryContexts.VECTORIZE_VIRTUAL_COLUMNS_KEY, vectorize ? "force" : "false");
context.put(QueryContexts.VECTOR_SIZE_KEY, 16); // Small vector size to ensure we use more than one.
context.putAll(myContext);
return context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ public AlwaysTwoCounterAggregatorFactory(String name, String field)
@Override
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
throw new IllegalStateException("don't call this");
throw new IllegalStateException(AlwaysTwoVectorizedVirtualColumn.DONT_CALL_THIS);
}

@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
throw new IllegalStateException("don't call this");
throw new IllegalStateException(AlwaysTwoVectorizedVirtualColumn.DONT_CALL_THIS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,26 +48,38 @@
*/
public class AlwaysTwoVectorizedVirtualColumn implements VirtualColumn
{
static final String DONT_CALL_THIS = "don't call this";
private final String outputName;
private final ColumnCapabilities capabilities;
private final boolean dictionaryEncoded;
private final boolean canVectorize;

public AlwaysTwoVectorizedVirtualColumn(
String name,
ColumnCapabilities capabilites
)
{
this(name, capabilites, true);
}

public AlwaysTwoVectorizedVirtualColumn(
String name,
ColumnCapabilities capabilites,
boolean canVectorize
)
{
this.outputName = name;
this.capabilities = capabilites;
this.dictionaryEncoded = capabilites.isDictionaryEncoded().isTrue() &&
capabilites.areDictionaryValuesUnique().isTrue();
this.canVectorize = canVectorize;
}

@Override
public boolean canVectorize(ColumnInspector inspector)
{
Assert.assertNotNull(inspector);
return true;
return canVectorize;
}

@Override
Expand All @@ -79,13 +91,13 @@ public String getOutputName()
@Override
public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec, ColumnSelectorFactory factory)
{
throw new IllegalStateException("don't call this");
throw new IllegalStateException(DONT_CALL_THIS);
}

@Override
public ColumnValueSelector<?> makeColumnValueSelector(String columnName, ColumnSelectorFactory factory)
{
throw new IllegalStateException("don't call this");
throw new IllegalStateException(DONT_CALL_THIS);
}

@Override
Expand Down
Loading