Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ private void setupQueries()
queryAggs.add(new DoubleMinAggregatorFactory("minFloatZipf", "minFloatZipf"));
queryAggs.add(new HyperUniquesAggregatorFactory("hyperUniquesMet", "hyper"));

// Use an IdentityExtractionFn to force usage of DimExtractionTopNAlgorithm
// Use an IdentityExtractionFn to force usage of HeapBasedTopNAlgorithm
TopNQueryBuilder queryBuilderString = new TopNQueryBuilder()
.dataSource("blah")
.granularity(Granularities.ALL)
Expand All @@ -174,7 +174,7 @@ private void setupQueries()
.intervals(intervalSpec)
.aggregators(queryAggs);

// DimExtractionTopNAlgorithm is always used for numeric columns
// HeapBasedTopNAlgorithm is always used for numeric columns
TopNQueryBuilder queryBuilderLong = new TopNQueryBuilder()
.dataSource("blah")
.granularity(Granularities.ALL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,36 +120,34 @@ private TopNMapFn getMapFn(


final TopNAlgorithm<?, ?> topNAlgorithm;
if (
selector.isHasExtractionFn() &&
if (requiresHeapAlgorithm(selector, query, columnCapabilities)) {
// heap based algorithm selection
if (selector.isHasExtractionFn() && dimension.equals(ColumnHolder.TIME_COLUMN_NAME)) {
// TimeExtractionTopNAlgorithm can work on any single-value dimension of type long.
// Once we have arbitrary dimension types following check should be replaced by checking
// that the column is of type long and single-value.
dimension.equals(ColumnHolder.TIME_COLUMN_NAME)
) {
// A special TimeExtractionTopNAlgorithm is required, since DimExtractionTopNAlgorithm
// currently relies on the dimension cardinality to support lexicographic sorting
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else if (selector.isHasExtractionFn()) {
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (columnCapabilities == null || !(columnCapabilities.getType() == ValueType.STRING
&& columnCapabilities.isDictionaryEncoded())) {
// Use HeapBasedTopNAlgorithm for non-Strings and for non-dictionary-encoded Strings, and for things we don't know
// which can happen for 'inline' data sources when this is run on the broker
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
// Use HeapBasedTopNAlgorithm when the dimension output type is a non-String. (It's like an extractionFn: there can be
// a many-to-one mapping, since numeric types can't represent all possible values of other types.)
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
} else if (selector.isAggregateAllMetrics()) {
// sorted by dimension
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
} else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
// high cardinality dimensions with larger result sets
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
// We might be able to use this for any long column with an extraction function, that is
// ValueType.LONG.equals(columnCapabilities.getType())
// but this needs investigation to ensure that it is an improvement over HeapBasedTopNAlgorithm

// A special TimeExtractionTopNAlgorithm is required since DimExtractionTopNAlgorithm
// currently relies on the dimension cardinality to support lexicographic sorting
topNAlgorithm = new TimeExtractionTopNAlgorithm(adapter, query);
} else {
topNAlgorithm = new HeapBasedTopNAlgorithm(adapter, query);
}
} else {
// anything else
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
// pool based algorithm selection
if (selector.isAggregateAllMetrics()) {
// if sorted by dimension we should aggregate all metrics in a single pass, use the regular pooled algorithm for
// this
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
} else if (selector.isAggregateTopNMetricFirst() || query.getContextBoolean("doAggregateTopNMetricFirst", false)) {
// for high cardinality dimensions with larger result sets we aggregate with only the ordering aggregation to
// compute the first 'n' values, and then for the rest of the metrics but for only the 'n' values
topNAlgorithm = new AggregateTopNMetricFirstAlgorithm(adapter, query, bufferPool);
} else {
// anything else, use the regular pooled algorithm
topNAlgorithm = new PooledTopNAlgorithm(adapter, query, bufferPool);
}
}
if (queryMetrics != null) {
queryMetrics.algorithm(topNAlgorithm);
Expand All @@ -158,6 +156,40 @@ private TopNMapFn getMapFn(
return new TopNMapFn(query, topNAlgorithm);
}

/**
* {@link PooledTopNAlgorithm} (and {@link AggregateTopNMetricFirstAlgorithm} which utilizes the pooled
* algorithm) are optimized off-heap algorithms for aggregating dictionary encoded string columns. These algorithms
* rely on dictionary ids being unique so to aggregate on the dictionary ids directly and defer
* {@link org.apache.druid.segment.DimensionSelector#lookupName(int)} until as late as possible in query processing.
*
* When these conditions are not true, we have an on-heap fall-back algorithm, the {@link HeapBasedTopNAlgorithm}
* (and {@link TimeExtractionTopNAlgorithm} for a specialized form for long columns) which aggregates on values of
* selectors.
*/
private static boolean requiresHeapAlgorithm(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this method would be easier to read if it was called canUsePooledAlgorithm and the checks were all flipped. This is because IMO it makes sense to view the heap algorithm as the base case, and the pooled algorithm as a special case, meaning the logic should be "can we do the special case".

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha, that was the original name of this method and it was flipped, but I second guessed myself at the last minute and swapped when I noticed only 1 clause could return true. The javadocs still talk about the pooled algorithm first.

Still, I agree and think canUsePooledAlgorithm is better, I'll switch it back 👍

final TopNAlgorithmSelector selector,
final TopNQuery query,
final ColumnCapabilities capabilities
)
{
if (selector.isHasExtractionFn()) {
// extraction functions can have a many to one mapping, and should use a heap algorithm
return true;
}

if (query.getDimensionSpec().getOutputType() != ValueType.STRING) {
// non-string output cannot use the pooled algorith, even if the underlying selector supports it
return true;
}
if (capabilities != null && capabilities.getType() == ValueType.STRING) {
// string columns must use the on heap algorithm unless they have the following capabilites
return !(capabilities.isDictionaryEncoded() && capabilities.areDictionaryValuesUnique().isTrue());
} else {
// non-strings are not eligible to use the pooled algorithm, and should use a heap algorithm
return true;
}
}

public static boolean canApplyExtractionInPost(TopNQuery query)
{
return query.getDimensionSpec() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Aggregator[][] getRowSelector(TopNQuery query, TopNParams params, Storage
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
}

// This method is used for the DimExtractionTopNAlgorithm only.
// This method is used for the HeapBasedTopNAlgorithm only.
// Unlike regular topN we cannot rely on ordering to optimize.
// Optimization possibly requires a reverse lookup from value to ID, which is
// not possible when applying an extraction function
Expand Down
29 changes: 29 additions & 0 deletions server/src/test/java/org/apache/druid/server/QueryStackTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
Expand All @@ -41,6 +44,7 @@
import org.apache.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.metadata.SegmentMetadataQueryConfig;
import org.apache.druid.query.metadata.SegmentMetadataQueryQueryToolChest;
import org.apache.druid.query.metadata.SegmentMetadataQueryRunnerFactory;
Expand All @@ -61,7 +65,10 @@
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.LookupJoinableFactory;
import org.apache.druid.segment.join.MapJoinableFactoryTest;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy;
Expand Down Expand Up @@ -273,4 +280,26 @@ public int getNumMergeBuffers()
return conglomerate;
}

public static JoinableFactory makeJoinableFactoryForLookup(
LookupExtractorFactoryContainerProvider lookupProvider
)
{
return makeJoinableFactoryFromDefault(lookupProvider, null);
}

public static JoinableFactory makeJoinableFactoryFromDefault(
@Nullable LookupExtractorFactoryContainerProvider lookupProvider,
@Nullable Map<Class<? extends DataSource>, JoinableFactory> custom
)
{
ImmutableMap.Builder<Class<? extends DataSource>, JoinableFactory> builder = ImmutableMap.builder();
builder.put(InlineDataSource.class, new InlineJoinableFactory());
if (lookupProvider != null) {
builder.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider));
}
if (custom != null) {
builder.putAll(custom);
}
return MapJoinableFactoryTest.fromMap(builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ public void testDatabaseMetaDataTables() throws Exception
final DatabaseMetaData metaData = client.getMetaData();
Assert.assertEquals(
ImmutableList.of(
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
Expand Down Expand Up @@ -441,6 +447,12 @@ public void testDatabaseMetaDataTablesAsSuperuser() throws Exception
final DatabaseMetaData metaData = superuserClient.getMetaData();
Assert.assertEquals(
ImmutableList.of(
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.BROADCAST_DATASOURCE),
Pair.of("TABLE_SCHEM", "druid"),
Pair.of("TABLE_TYPE", "TABLE")
),
row(
Pair.of("TABLE_CAT", "druid"),
Pair.of("TABLE_NAME", CalciteTests.DATASOURCE1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,10 @@ public QueryLogHook getQueryLogHook()
@Before
public void setUp() throws Exception
{
walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder());
walker = CalciteTests.createMockWalker(
conglomerate,
temporaryFolder.newFolder()
);
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.Druids;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
Expand Down Expand Up @@ -713,6 +714,7 @@ public void testInformationSchemaTables() throws Exception
+ "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')",
ImmutableList.of(),
ImmutableList.<Object[]>builder()
.add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
Expand Down Expand Up @@ -742,6 +744,7 @@ public void testInformationSchemaTables() throws Exception
CalciteTests.SUPER_USER_AUTH_RESULT,
ImmutableList.of(),
ImmutableList.<Object[]>builder()
.add(new Object[]{"druid", CalciteTests.BROADCAST_DATASOURCE, "TABLE", "YES", "YES"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"})
.add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"})
Expand Down Expand Up @@ -15002,6 +15005,46 @@ public void testValidationErrorWrongTypeLiteral() throws Exception
);
}

@Test
@Parameters(source = QueryContextForJoinProvider.class)
public void testTopNOnStringWithNonSortedOrUniqueDictionary(Map<String, Object> queryContext) throws Exception
{
testQuery(
"SELECT druid.broadcast.dim4, COUNT(*)\n"
+ "FROM druid.numfoo\n"
+ "INNER JOIN druid.broadcast ON numfoo.dim4 = broadcast.dim4\n"
+ "GROUP BY 1 ORDER BY 2 LIMIT 4",
queryContext,
ImmutableList.of(
new TopNQueryBuilder()
.dataSource(
join(
new TableDataSource(CalciteTests.DATASOURCE3),
new GlobalTableDataSource(CalciteTests.BROADCAST_DATASOURCE),
"j0.",
equalsCondition(
DruidExpression.fromColumn("dim4"),
DruidExpression.fromColumn("j0.dim4")
),
JoinType.INNER

)
)
.intervals(querySegmentSpec(Filtration.eternity()))
.dimension(new DefaultDimensionSpec("j0.dim4", "_d0", ValueType.STRING))
.threshold(4)
.aggregators(aggregators(new CountAggregatorFactory("a0")))
.context(queryContext)
.metric(new InvertedTopNMetricSpec(new NumericTopNMetricSpec("a0")))
.build()
),
ImmutableList.of(
new Object[]{"a", 9L},
new Object[]{"b", 9L}
)
);
}

/**
* This is a provider of query contexts that should be used by join tests.
* It tests various configs that can be passed to join queries. All the configs provided by this provider should
Expand Down
Loading