Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Injector;
import com.google.inject.Module;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.server.ClientQuerySegmentWalker;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.initialization.ServerConfig;
Expand Down Expand Up @@ -377,6 +379,7 @@ public void emit(Event event)
baseClient,
null /* local client; unused in this test, so pass in null */,
warehouse,
new MapJoinableFactory(ImmutableMap.of()),
retryConfig,
jsonMapper,
serverConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ public interface DataSource
/**
* Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or
* for queries of those.
*
* Currently this is coupled with joinability - if this returns true then the query engine expects there exists a
* {@link org.apache.druid.segment.join.JoinableFactory} which might build a
* {@link org.apache.druid.segment.join.Joinable} for this datasource directly. If a subquery 'inline' join is
* required to join this datasource on the right hand side, then this value must be false for now.
*
* In the future, instead of directly using this method, the query planner and engine should consider
* {@link org.apache.druid.segment.join.JoinableFactory#isDirectlyJoinable(DataSource)} when determining if the
* right hand side is directly joinable, which would allow decoupling this property from joins.
*/
boolean isGlobal();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public List<PreJoinableClause> getPreJoinableClauses()

/**
* Returns true if all servers have the ability to compute this datasource. These datasources depend only on
* globally broadcast data, like lookups or inline data.
* globally broadcast data, like lookups or inline data or broadcast segments.
*/
public boolean isGlobal()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@
*/
public interface JoinableFactory
{
/**
* Returns true if a {@link Joinable} **may** be created for a given {@link DataSource}, but is not a guarantee that
* {@link #build} will return a non-empty result. Successfully building a {@link Joinable} might require specific
* criteria of the {@link JoinConditionAnalysis}.
*/
boolean isDirectlyJoinable(DataSource dataSource);

/**
* Create a Joinable object. This may be an expensive operation involving loading data, creating a hash table, etc.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ public MapJoinableFactory(Map<Class<? extends DataSource>, JoinableFactory> join
this.joinableFactories = new IdentityHashMap<>(joinableFactories);
}

@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
JoinableFactory factory = joinableFactories.get(dataSource.getClass());
if (factory == null) {
return false;
} else {
return factory.isDirectlyJoinable(dataSource);
}
}

@Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.extraction.MapLookupExtractor;
Expand Down Expand Up @@ -155,13 +156,24 @@ public void test_createSegmentMapFn_usableClause()

final Function<SegmentReference, SegmentReference> segmentMapFn = Joinables.createSegmentMapFn(
ImmutableList.of(clause),
(dataSource, condition) -> {
if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
return Optional.of(
LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
);
} else {
return Optional.empty();
new JoinableFactory()
{
@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
return dataSource.equals(lookupDataSource);
}

@Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) {
return Optional.of(
LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false))
);
} else {
return Optional.empty();
}
}
},
new AtomicLong(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public void setUp()
target = new MapJoinableFactory(
ImmutableMap.of(NoopDataSource.class, noopJoinableFactory));
}


@Test
public void testBuildDataSourceNotRegisteredShouldReturnAbsent()
{
Expand All @@ -89,4 +91,18 @@ public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory()
Optional<Joinable> joinable = target.build(noopDataSource, condition);
Assert.assertEquals(mockJoinable, joinable.get());
}

@Test
public void testIsDirectShouldBeFalseForNotRegistered()
{
Assert.assertFalse(target.isDirectlyJoinable(inlineDataSource));
}

@Test
public void testIsDirectlyJoinableShouldBeTrueForRegisteredThatIsJoinable()
{
EasyMock.expect(noopJoinableFactory.isDirectlyJoinable(noopDataSource)).andReturn(true).anyTimes();
EasyMock.replay(noopJoinableFactory);
Assert.assertTrue(target.isDirectlyJoinable(noopDataSource));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ private NoopJoinableFactory()
// Singleton.
}

@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
return false;
}

@Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@
*/
public class InlineJoinableFactory implements JoinableFactory
{
@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
// this should always be true if this is access through MapJoinableFactory, but check just in case...
// further, this should not ever be legitimately called, because this method is used to avoid subquery joins
// which use the InlineJoinableFactory
return dataSource instanceof InlineDataSource;
}

@Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ public LookupJoinableFactory(LookupExtractorFactoryContainerProvider lookupProvi
this.lookupProvider = lookupProvider;
}

@Override
public boolean isDirectlyJoinable(DataSource dataSource)
{
// this should always be true if this is access through MapJoinableFactory, but check just in case...
return dataSource instanceof LookupDataSource;
}

@Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FluentQueryRunnerBuilder;
import org.apache.druid.query.GlobalTableDataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.PostProcessingOperator;
import org.apache.druid.query.Query;
Expand All @@ -47,9 +48,11 @@
import org.apache.druid.query.RetryQueryRunner;
import org.apache.druid.query.RetryQueryRunnerConfig;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.planning.DataSourceAnalysis;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.initialization.ServerConfig;
import org.joda.time.Interval;

Expand Down Expand Up @@ -77,6 +80,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final QuerySegmentWalker clusterClient;
private final QuerySegmentWalker localClient;
private final QueryToolChestWarehouse warehouse;
private final JoinableFactory joinableFactory;
private final RetryQueryRunnerConfig retryConfig;
private final ObjectMapper objectMapper;
private final ServerConfig serverConfig;
Expand All @@ -88,6 +92,7 @@ public ClientQuerySegmentWalker(
QuerySegmentWalker clusterClient,
QuerySegmentWalker localClient,
QueryToolChestWarehouse warehouse,
JoinableFactory joinableFactory,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
ServerConfig serverConfig,
Expand All @@ -99,6 +104,7 @@ public ClientQuerySegmentWalker(
this.clusterClient = clusterClient;
this.localClient = localClient;
this.warehouse = warehouse;
this.joinableFactory = joinableFactory;
this.retryConfig = retryConfig;
this.objectMapper = objectMapper;
this.serverConfig = serverConfig;
Expand All @@ -112,6 +118,7 @@ public ClientQuerySegmentWalker(
CachingClusteredClient clusterClient,
LocalQuerySegmentWalker localClient,
QueryToolChestWarehouse warehouse,
JoinableFactory joinableFactory,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper,
ServerConfig serverConfig,
Expand All @@ -124,6 +131,7 @@ public ClientQuerySegmentWalker(
(QuerySegmentWalker) clusterClient,
(QuerySegmentWalker) localClient,
warehouse,
joinableFactory,
retryConfig,
objectMapper,
serverConfig,
Expand All @@ -137,10 +145,13 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<In
{
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);

// First, do an inlining dry run to see if any inlining is necessary, without actually running the queries.
// transform TableDataSource to GlobalTableDataSource when eligible
// before further transformation to potentially inline
final DataSource freeTradeDataSource = globalizeIfPossible(query.getDataSource());
// do an inlining dry run to see if any inlining is necessary, without actually running the queries.
final int maxSubqueryRows = QueryContexts.getMaxSubqueryRows(query, serverConfig.getMaxSubqueryRows());
final DataSource inlineDryRun = inlineIfNecessary(
query.getDataSource(),
freeTradeDataSource,
toolChest,
new AtomicInteger(),
maxSubqueryRows,
Expand All @@ -156,7 +167,7 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<In
// Now that we know the structure is workable, actually do the inlining (if necessary).
final Query<T> newQuery = query.withDataSource(
inlineIfNecessary(
query.getDataSource(),
freeTradeDataSource,
toolChest,
new AtomicInteger(),
maxSubqueryRows,
Expand Down Expand Up @@ -187,10 +198,15 @@ public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<In
@Override
public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<SegmentDescriptor> specs)
{
// Inlining isn't done for segments-based queries.
// Inlining isn't done for segments-based queries, but we still globalify the table datasources if possible
final Query<T> freeTradeQuery = query.withDataSource(globalizeIfPossible(query.getDataSource()));

if (canRunQueryUsingClusterWalker(query)) {
return decorateClusterRunner(query, clusterClient.getQueryRunnerForSegments(query, specs));
return new QuerySwappingQueryRunner<>(
decorateClusterRunner(freeTradeQuery, clusterClient.getQueryRunnerForSegments(freeTradeQuery, specs)),
query,
freeTradeQuery
);
} else {
// We don't expect end-users to see this message, since it only happens when specific segments are requested;
// this is not typical end-user behavior.
Expand Down Expand Up @@ -235,6 +251,27 @@ private <T> boolean canRunQueryUsingClusterWalker(Query<T> query)
|| toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery()));
}


private DataSource globalizeIfPossible(
final DataSource dataSource
)
{
if (dataSource instanceof TableDataSource) {
GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(((TableDataSource) dataSource).getName());
if (joinableFactory.isDirectlyJoinable(maybeGlobal)) {
return maybeGlobal;
}
return dataSource;
} else {
List<DataSource> currentChildren = dataSource.getChildren();
List<DataSource> newChildren = new ArrayList<>(currentChildren.size());
for (DataSource child : currentChildren) {
newChildren.add(globalizeIfPossible(child));
}
return dataSource.withChildren(newChildren);
}
}

/**
* Replace QueryDataSources with InlineDataSources when necessary and possible. "Necessary" is defined as:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@ public void testBuild()
Assert.assertEquals(3, joinable.getCardinality("long"));
}

@Test
public void testIsDirectlyJoinable()
{
Assert.assertTrue(factory.isDirectlyJoinable(inlineDataSource));
Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo")));
}

private static JoinConditionAnalysis makeCondition(final String condition)
{
return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ public void testBuild()
Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, joinable.getCardinality("v"));
}

@Test
public void testIsDirectlyJoinable()
{
Assert.assertTrue(factory.isDirectlyJoinable(lookupDataSource));
Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo")));
}

private static JoinConditionAnalysis makeCondition(final String condition)
{
return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil());
Expand Down
Loading