diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 971f8ed8b264..23dc65f5235c 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Injector; import com.google.inject.Module; @@ -64,6 +65,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; @@ -377,6 +379,7 @@ public void emit(Event event) baseClient, null /* local client; unused in this test, so pass in null */, warehouse, + new MapJoinableFactory(ImmutableMap.of()), retryConfig, jsonMapper, serverConfig, diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index c3edd1c79647..d12a8eeabab8 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -71,6 +71,15 @@ public interface DataSource /** * Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or * for queries of those. + * + * Currently this is coupled with joinability - if this returns true then the query engine expects there exists a + * {@link org.apache.druid.segment.join.JoinableFactory} which might build a + * {@link org.apache.druid.segment.join.Joinable} for this datasource directly. If a subquery 'inline' join is + * required to join this datasource on the right hand side, then this value must be false for now. + * + * In the future, instead of directly using this method, the query planner and engine should consider + * {@link org.apache.druid.segment.join.JoinableFactory#isDirectlyJoinable(DataSource)} when determining if the + * right hand side is directly joinable, which would allow decoupling this property from joins. */ boolean isGlobal(); diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java index 4237e50dc473..5b34f7623fed 100644 --- a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java +++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java @@ -205,7 +205,7 @@ public List getPreJoinableClauses() /** * Returns true if all servers have the ability to compute this datasource. These datasources depend only on - * globally broadcast data, like lookups or inline data. + * globally broadcast data, like lookups or inline data or broadcast segments. */ public boolean isGlobal() { diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java index fc63f1cfbee3..723aba57faa9 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java @@ -30,6 +30,13 @@ */ public interface JoinableFactory { + /** + * Returns true if a {@link Joinable} **may** be created for a given {@link DataSource}, but is not a guarantee that + * {@link #build} will return a non-empty result. Successfully building a {@link Joinable} might require specific + * criteria of the {@link JoinConditionAnalysis}. + */ + boolean isDirectlyJoinable(DataSource dataSource); + /** * Create a Joinable object. This may be an expensive operation involving loading data, creating a hash table, etc. * diff --git a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java index beb8106225d4..abf4b6ae4d06 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java @@ -43,6 +43,17 @@ public MapJoinableFactory(Map, JoinableFactory> join this.joinableFactories = new IdentityHashMap<>(joinableFactories); } + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + JoinableFactory factory = joinableFactories.get(dataSource.getClass()); + if (factory == null) { + return false; + } else { + return factory.isDirectlyJoinable(dataSource); + } + } + @Override public Optional build(DataSource dataSource, JoinConditionAnalysis condition) { diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java index ae36d175ff16..f7c4f4b415b8 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.IAE; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.DataSource; import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.extraction.MapLookupExtractor; @@ -155,13 +156,24 @@ public void test_createSegmentMapFn_usableClause() final Function segmentMapFn = Joinables.createSegmentMapFn( ImmutableList.of(clause), - (dataSource, condition) -> { - if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) { - return Optional.of( - LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false)) - ); - } else { - return Optional.empty(); + new JoinableFactory() + { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return dataSource.equals(lookupDataSource); + } + + @Override + public Optional build(DataSource dataSource, JoinConditionAnalysis condition) + { + if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) { + return Optional.of( + LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false)) + ); + } else { + return Optional.empty(); + } } }, new AtomicLong(), diff --git a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java index cf0336061409..1d00e711c190 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java @@ -65,6 +65,8 @@ public void setUp() target = new MapJoinableFactory( ImmutableMap.of(NoopDataSource.class, noopJoinableFactory)); } + + @Test public void testBuildDataSourceNotRegisteredShouldReturnAbsent() { @@ -89,4 +91,18 @@ public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory() Optional joinable = target.build(noopDataSource, condition); Assert.assertEquals(mockJoinable, joinable.get()); } + + @Test + public void testIsDirectShouldBeFalseForNotRegistered() + { + Assert.assertFalse(target.isDirectlyJoinable(inlineDataSource)); + } + + @Test + public void testIsDirectlyJoinableShouldBeTrueForRegisteredThatIsJoinable() + { + EasyMock.expect(noopJoinableFactory.isDirectlyJoinable(noopDataSource)).andReturn(true).anyTimes(); + EasyMock.replay(noopJoinableFactory); + Assert.assertTrue(target.isDirectlyJoinable(noopDataSource)); + } } diff --git a/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java index ff138041f131..1583b02b5d5d 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java +++ b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java @@ -32,6 +32,12 @@ private NoopJoinableFactory() // Singleton. } + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return false; + } + @Override public Optional build(DataSource dataSource, JoinConditionAnalysis condition) { diff --git a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java index 5945d42957cf..4eee53fde5dc 100644 --- a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java +++ b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java @@ -36,6 +36,15 @@ */ public class InlineJoinableFactory implements JoinableFactory { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + // this should always be true if this is access through MapJoinableFactory, but check just in case... + // further, this should not ever be legitimately called, because this method is used to avoid subquery joins + // which use the InlineJoinableFactory + return dataSource instanceof InlineDataSource; + } + @Override public Optional build(final DataSource dataSource, final JoinConditionAnalysis condition) { diff --git a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java index a6fd209b1d12..2dab0a616639 100644 --- a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java +++ b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java @@ -42,6 +42,13 @@ public LookupJoinableFactory(LookupExtractorFactoryContainerProvider lookupProvi this.lookupProvider = lookupProvider; } + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + // this should always be true if this is access through MapJoinableFactory, but check just in case... + return dataSource instanceof LookupDataSource; + } + @Override public Optional build(final DataSource dataSource, final JoinConditionAnalysis condition) { diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index fa35ff7b8a9c..c7318a60f7f6 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.DataSource; import org.apache.druid.query.FluentQueryRunnerBuilder; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.PostProcessingOperator; import org.apache.druid.query.Query; @@ -47,9 +48,11 @@ import org.apache.druid.query.RetryQueryRunner; import org.apache.druid.query.RetryQueryRunnerConfig; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.initialization.ServerConfig; import org.joda.time.Interval; @@ -77,6 +80,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final QuerySegmentWalker clusterClient; private final QuerySegmentWalker localClient; private final QueryToolChestWarehouse warehouse; + private final JoinableFactory joinableFactory; private final RetryQueryRunnerConfig retryConfig; private final ObjectMapper objectMapper; private final ServerConfig serverConfig; @@ -88,6 +92,7 @@ public ClientQuerySegmentWalker( QuerySegmentWalker clusterClient, QuerySegmentWalker localClient, QueryToolChestWarehouse warehouse, + JoinableFactory joinableFactory, RetryQueryRunnerConfig retryConfig, ObjectMapper objectMapper, ServerConfig serverConfig, @@ -99,6 +104,7 @@ public ClientQuerySegmentWalker( this.clusterClient = clusterClient; this.localClient = localClient; this.warehouse = warehouse; + this.joinableFactory = joinableFactory; this.retryConfig = retryConfig; this.objectMapper = objectMapper; this.serverConfig = serverConfig; @@ -112,6 +118,7 @@ public ClientQuerySegmentWalker( CachingClusteredClient clusterClient, LocalQuerySegmentWalker localClient, QueryToolChestWarehouse warehouse, + JoinableFactory joinableFactory, RetryQueryRunnerConfig retryConfig, ObjectMapper objectMapper, ServerConfig serverConfig, @@ -124,6 +131,7 @@ public ClientQuerySegmentWalker( (QuerySegmentWalker) clusterClient, (QuerySegmentWalker) localClient, warehouse, + joinableFactory, retryConfig, objectMapper, serverConfig, @@ -137,10 +145,13 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable> toolChest = warehouse.getToolChest(query); - // First, do an inlining dry run to see if any inlining is necessary, without actually running the queries. + // transform TableDataSource to GlobalTableDataSource when eligible + // before further transformation to potentially inline + final DataSource freeTradeDataSource = globalizeIfPossible(query.getDataSource()); + // do an inlining dry run to see if any inlining is necessary, without actually running the queries. final int maxSubqueryRows = QueryContexts.getMaxSubqueryRows(query, serverConfig.getMaxSubqueryRows()); final DataSource inlineDryRun = inlineIfNecessary( - query.getDataSource(), + freeTradeDataSource, toolChest, new AtomicInteger(), maxSubqueryRows, @@ -156,7 +167,7 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable newQuery = query.withDataSource( inlineIfNecessary( - query.getDataSource(), + freeTradeDataSource, toolChest, new AtomicInteger(), maxSubqueryRows, @@ -187,10 +198,15 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { - // Inlining isn't done for segments-based queries. + // Inlining isn't done for segments-based queries, but we still globalify the table datasources if possible + final Query freeTradeQuery = query.withDataSource(globalizeIfPossible(query.getDataSource())); if (canRunQueryUsingClusterWalker(query)) { - return decorateClusterRunner(query, clusterClient.getQueryRunnerForSegments(query, specs)); + return new QuerySwappingQueryRunner<>( + decorateClusterRunner(freeTradeQuery, clusterClient.getQueryRunnerForSegments(freeTradeQuery, specs)), + query, + freeTradeQuery + ); } else { // We don't expect end-users to see this message, since it only happens when specific segments are requested; // this is not typical end-user behavior. @@ -235,6 +251,27 @@ private boolean canRunQueryUsingClusterWalker(Query query) || toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())); } + + private DataSource globalizeIfPossible( + final DataSource dataSource + ) + { + if (dataSource instanceof TableDataSource) { + GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(((TableDataSource) dataSource).getName()); + if (joinableFactory.isDirectlyJoinable(maybeGlobal)) { + return maybeGlobal; + } + return dataSource; + } else { + List currentChildren = dataSource.getChildren(); + List newChildren = new ArrayList<>(currentChildren.size()); + for (DataSource child : currentChildren) { + newChildren.add(globalizeIfPossible(child)); + } + return dataSource.withChildren(newChildren); + } + } + /** * Replace QueryDataSources with InlineDataSources when necessary and possible. "Necessary" is defined as: * diff --git a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java index d1be69840430..2a5bf3e5c7ce 100644 --- a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java @@ -80,6 +80,13 @@ public void testBuild() Assert.assertEquals(3, joinable.getCardinality("long")); } + @Test + public void testIsDirectlyJoinable() + { + Assert.assertTrue(factory.isDirectlyJoinable(inlineDataSource)); + Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo"))); + } + private static JoinConditionAnalysis makeCondition(final String condition) { return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil()); diff --git a/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java b/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java index 44ed4b2810af..6e0e737db9ed 100644 --- a/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java @@ -125,6 +125,13 @@ public void testBuild() Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, joinable.getCardinality("v")); } + @Test + public void testIsDirectlyJoinable() + { + Assert.assertTrue(factory.isDirectlyJoinable(lookupDataSource)); + Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo"))); + } + private static JoinConditionAnalysis makeCondition(final String condition) { return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil()); diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index f224b5f580e4..565fee9fddd9 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -33,6 +33,7 @@ import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.Query; @@ -70,7 +71,9 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.join.InlineJoinableFactory; +import org.apache.druid.segment.join.JoinConditionAnalysis; import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.join.Joinable; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; @@ -96,6 +99,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** * Tests ClientQuerySegmentWalker. @@ -112,6 +116,7 @@ public class ClientQuerySegmentWalkerTest private static final String FOO = "foo"; private static final String BAR = "bar"; private static final String MULTI = "multi"; + private static final String GLOBAL = "broadcast"; private static final Interval INTERVAL = Intervals.of("2000/P1Y"); private static final String VERSION = "A"; @@ -218,6 +223,40 @@ public void testTimeseriesOnTable() Assert.assertEquals(1, scheduler.getTotalReleased().get()); } + @Test + public void testTimeseriesOnAutomaticGlobalTable() + { + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .dataSource(GLOBAL) + .granularity(Granularities.ALL) + .intervals(Collections.singletonList(INTERVAL)) + .aggregators(new LongSumAggregatorFactory("sum", "n")) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) + .build(); + + // expect global/joinable datasource to be automatically translated into a GlobalTableDataSource + final TimeseriesQuery expectedClusterQuery = + Druids.newTimeseriesQueryBuilder() + .dataSource(new GlobalTableDataSource(GLOBAL)) + .granularity(Granularities.ALL) + .intervals(Collections.singletonList(INTERVAL)) + .aggregators(new LongSumAggregatorFactory("sum", "n")) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) + .build(); + + testQuery( + query, + ImmutableList.of(ExpectedQuery.cluster(expectedClusterQuery)), + ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L}) + ); + + Assert.assertEquals(1, scheduler.getTotalRun().get()); + Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(1, scheduler.getTotalAcquired().get()); + Assert.assertEquals(1, scheduler.getTotalReleased().get()); + } + @Test public void testTimeseriesOnInline() { @@ -606,6 +645,20 @@ private void initWalker(final Map serverProperties, QuerySchedul final JoinableFactory joinableFactory = new MapJoinableFactory( ImmutableMap., JoinableFactory>builder() .put(InlineDataSource.class, new InlineJoinableFactory()) + .put(GlobalTableDataSource.class, new JoinableFactory() + { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return ((GlobalTableDataSource) dataSource).getName().equals(GLOBAL); + } + + @Override + public Optional build(DataSource dataSource, JoinConditionAnalysis condition) + { + return Optional.empty(); + } + }) .build() ); @@ -651,7 +704,8 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable> QueryToolChest getToolChest return conglomerate.findFactory(query).getToolchest(); } }, + joinableFactory, new RetryQueryRunnerConfig(), TestHelper.makeJsonMapper(), serverConfig, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java index 1655d73e5445..6faf2198627a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java @@ -337,6 +337,8 @@ private static boolean computeLeftRequiresSubquery(final DruidRel left) private static boolean computeRightRequiresSubquery(final DruidRel right) { // Right requires a subquery unless it's a scan or mapping on top of a global datasource. + // ideally this would involve JoinableFactory.isDirectlyJoinable to check that the global datasources + // are in fact possibly joinable, but for now isGlobal is coupled to joinability return !(DruidRels.isScanOrMapping(right, false) && DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent()); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 6762aaf13c94..b264749308c6 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -56,6 +56,7 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -104,6 +105,7 @@ public class DruidSchema extends AbstractSchema private final PlannerConfig config; private final SegmentManager segmentManager; private final ViewManager viewManager; + private final JoinableFactory joinableFactory; private final ExecutorService cacheExec; private final ConcurrentMap tables; @@ -148,6 +150,7 @@ public DruidSchema( final QueryLifecycleFactory queryLifecycleFactory, final TimelineServerView serverView, final SegmentManager segmentManager, + final JoinableFactory joinableFactory, final PlannerConfig config, final ViewManager viewManager, final Escalator escalator @@ -156,6 +159,7 @@ public DruidSchema( this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); Preconditions.checkNotNull(serverView, "serverView"); this.segmentManager = segmentManager; + this.joinableFactory = joinableFactory; this.config = Preconditions.checkNotNull(config, "config"); this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager"); this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); @@ -278,10 +282,11 @@ public void start() throws InterruptedException for (String dataSource : dataSourcesToRebuild) { final DruidTable druidTable = buildDruidTable(dataSource); final DruidTable oldTable = tables.put(dataSource, druidTable); + final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource"; if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { - log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature()); + log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.getRowSignature()); } else { - log.debug("dataSource [%s] signature is unchanged.", dataSource); + log.debug("%s [%s] signature is unchanged.", description, dataSource); } } @@ -627,12 +632,21 @@ protected DruidTable buildDruidTable(final String dataSource) columnTypes.forEach(builder::add); final TableDataSource tableDataSource; - if (segmentManager.getDataSourceNames().contains(dataSource)) { - tableDataSource = new GlobalTableDataSource(dataSource); + + // to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing + // in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here) + // to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources + // at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource + // if also joinable + final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource); + final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal); + final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource); + if (isBroadcast && isJoinable) { + tableDataSource = maybeGlobal; } else { tableDataSource = new TableDataSource(dataSource); } - return new DruidTable(tableDataSource, builder.build()); + return new DruidTable(tableDataSource, builder.build(), isJoinable, isBroadcast); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java index bf84ea131c8c..8ee93a21e631 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java @@ -53,6 +53,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; @@ -83,6 +84,8 @@ public class InformationSchema extends AbstractSchema .add("TABLE_SCHEMA", ValueType.STRING) .add("TABLE_NAME", ValueType.STRING) .add("TABLE_TYPE", ValueType.STRING) + .add("IS_JOINABLE", ValueType.STRING) + .add("IS_BROADCAST", ValueType.STRING) .build(); private static final RowSignature COLUMNS_SIGNATURE = RowSignature .builder() @@ -109,6 +112,9 @@ public class InformationSchema extends AbstractSchema return Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(datasourceName)); }; + private static final String INFO_TRUE = "YES"; + private static final String INFO_FALSE = "NO"; + private final SchemaPlus rootSchema; private final Map tableMap; private final AuthorizerMapper authorizerMapper; @@ -217,18 +223,27 @@ public Iterable apply(final String schemaName) return Iterables.filter( Iterables.concat( FluentIterable.from(authorizedTableNames).transform( - new Function() - { - @Override - public Object[] apply(final String tableName) - { - return new Object[]{ - CATALOG_NAME, // TABLE_CATALOG - schemaName, // TABLE_SCHEMA - tableName, // TABLE_NAME - subSchema.getTable(tableName).getJdbcTableType().toString() // TABLE_TYPE - }; + tableName -> { + final Table table = subSchema.getTable(tableName); + final boolean isJoinable; + final boolean isBroadcast; + if (table instanceof DruidTable) { + DruidTable druidTable = (DruidTable) table; + isJoinable = druidTable.isJoinable(); + isBroadcast = druidTable.isBroadcast(); + } else { + isJoinable = false; + isBroadcast = false; } + + return new Object[]{ + CATALOG_NAME, // TABLE_CATALOG + schemaName, // TABLE_SCHEMA + tableName, // TABLE_NAME + table.getJdbcTableType().toString(), // TABLE_TYPE + isJoinable ? INFO_TRUE : INFO_FALSE, // IS_JOINABLE + isBroadcast ? INFO_TRUE : INFO_FALSE // IS_BROADCAST + }; } ), FluentIterable.from(authorizedFunctionNames).transform( @@ -242,7 +257,9 @@ public Object[] apply(final String functionName) CATALOG_NAME, // TABLE_CATALOG schemaName, // TABLE_SCHEMA functionName, // TABLE_NAME - "VIEW" // TABLE_TYPE + "VIEW", // TABLE_TYPE + INFO_FALSE, // IS_JOINABLE + INFO_FALSE // IS_BROADCAST }; } else { return null; @@ -406,7 +423,7 @@ public Object[] apply(final RelDataTypeField field) field.getName(), // COLUMN_NAME String.valueOf(field.getIndex()), // ORDINAL_POSITION "", // COLUMN_DEFAULT - type.isNullable() ? "YES" : "NO", // IS_NULLABLE + type.isNullable() ? INFO_TRUE : INFO_FALSE, // IS_NULLABLE type.getSqlTypeName().toString(), // DATA_TYPE null, // CHARACTER_MAXIMUM_LENGTH null, // CHARACTER_OCTET_LENGTH diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java index b3f331471791..6ddeaabb445d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java @@ -57,7 +57,9 @@ protected Map getTableMap() final ImmutableMap.Builder tableMapBuilder = ImmutableMap.builder(); for (final String lookupName : lookupProvider.getAllLookupNames()) { - tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE)); + // all lookups should be also joinable through lookup joinable factory, and lookups are effectively broadcast + // (if we ignore lookup tiers...) + tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE, true, true)); } return tableMapBuilder.build(); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java index 521a051b7c98..94da5ede16e5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java @@ -41,14 +41,20 @@ public class DruidTable implements TranslatableTable { private final DataSource dataSource; private final RowSignature rowSignature; + private final boolean joinable; + private final boolean broadcast; public DruidTable( final DataSource dataSource, - final RowSignature rowSignature + final RowSignature rowSignature, + final boolean isJoinable, + final boolean isBroadcast ) { this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); + this.joinable = isJoinable; + this.broadcast = isBroadcast; } public DataSource getDataSource() @@ -61,6 +67,16 @@ public RowSignature getRowSignature() return rowSignature; } + public boolean isJoinable() + { + return joinable; + } + + public boolean isBroadcast() + { + return broadcast; + } + @Override public Schema.TableType getJdbcTableType() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 1ad92cb538f5..62c1519473fe 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -708,59 +708,59 @@ public void testInformationSchemaSchemata() throws Exception public void testInformationSchemaTables() throws Exception { testQuery( - "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n" + "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n" + "FROM INFORMATION_SCHEMA.TABLES\n" + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", ImmutableList.of(), ImmutableList.builder() - .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE"}) - .add(new Object[]{"druid", "aview", "VIEW"}) - .add(new Object[]{"druid", "bview", "VIEW"}) - .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}) - .add(new Object[]{"lookup", "lookyloo", "TABLE"}) - .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"}) + .add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"}) + .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"}) .build() ); testQuery( PLANNER_CONFIG_DEFAULT, - "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n" + "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n" + "FROM INFORMATION_SCHEMA.TABLES\n" + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", CalciteTests.SUPER_USER_AUTH_RESULT, ImmutableList.of(), ImmutableList.builder() - .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE"}) - .add(new Object[]{"druid", "aview", "VIEW"}) - .add(new Object[]{"druid", "bview", "VIEW"}) - .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}) - .add(new Object[]{"lookup", "lookyloo", "TABLE"}) - .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.SOME_DATASOURCE, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.SOMEXDATASOURCE, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"}) + .add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"}) + .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"}) .build() ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java index 11c2b97f4244..a5e883176039 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite.schema; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Guice; import com.google.inject.Injector; @@ -38,6 +39,8 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.lookup.LookupReferencesManager; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.security.AuthorizerMapper; @@ -102,6 +105,7 @@ public void setUp() binder -> { binder.bind(QueryLifecycleFactory.class).toInstance(queryLifecycleFactory); binder.bind(TimelineServerView.class).toInstance(serverView); + binder.bind(JoinableFactory.class).toInstance(new MapJoinableFactory(ImmutableMap.of())); binder.bind(PlannerConfig.class).toInstance(plannerConfig); binder.bind(ViewManager.class).toInstance(viewManager); binder.bind(Escalator.class).toInstance(escalator); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index fd20fb594e85..0ba26ea9d3ea 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.SegmentManager; @@ -54,6 +55,7 @@ public void testInitializationWithNoData() throws Exception ), new TestServerInventoryView(Collections.emptyList()), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), + new MapJoinableFactory(ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 8455965ef5e9..adb3626c54af 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.DataSource; import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.TableDataSource; @@ -43,6 +44,10 @@ import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.Joinable; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryStackTests; @@ -77,6 +82,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -132,12 +138,15 @@ public static void tearDownClass() throws IOException private SpecificSegmentsQuerySegmentWalker walker = null; private DruidSchema schema = null; private SegmentManager segmentManager; - private Set dataSourceNames; + private Set segmentDataSourceNames; + private Set joinableDataSourceNames; @Before public void setUp() throws Exception { - dataSourceNames = Sets.newConcurrentHashSet(); + segmentDataSourceNames = Sets.newConcurrentHashSet(); + joinableDataSourceNames = Sets.newConcurrentHashSet(); + final File tmpDir = temporaryFolder.newFolder(); final QueryableIndex index1 = IndexBuilder.create() .tmpDir(new File(tmpDir, "1")) @@ -173,7 +182,7 @@ public void setUp() throws Exception public Set getDataSourceNames() { getDatasourcesLatch.countDown(); - return dataSourceNames; + return segmentDataSourceNames; } }; @@ -222,10 +231,30 @@ public Set getDataSourceNames() serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); druidServers = serverView.getDruidServers(); + final JoinableFactory globalTableJoinable = new JoinableFactory() + { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return dataSource instanceof GlobalTableDataSource && + joinableDataSourceNames.contains(((GlobalTableDataSource) dataSource).getName()); + } + + @Override + public Optional build( + DataSource dataSource, + JoinConditionAnalysis condition + ) + { + return Optional.empty(); + } + }; + schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, + new MapJoinableFactory(ImmutableMap.of(GlobalTableDataSource.class, globalTableJoinable)), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() @@ -461,12 +490,16 @@ public void testAvailableSegmentMetadataIsRealtime() } @Test - public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedException + public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException { DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); Assert.assertNotNull(fooTable); Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.isJoinable()); + Assert.assertFalse(fooTable.isBroadcast()); + + buildTableLatch.await(1, TimeUnit.SECONDS); final DataSegment someNewBrokerSegment = new DataSegment( "foo", @@ -481,9 +514,30 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce 100L, PruneSpecsHolder.DEFAULT ); - dataSourceNames.add("foo"); + segmentDataSourceNames.add("foo"); + joinableDataSourceNames.add("foo"); serverView.addSegment(someNewBrokerSegment, ServerType.BROKER); + // wait for build twice + buildTableLatch = new CountDownLatch(2); + buildTableLatch.await(1, TimeUnit.SECONDS); + + // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) + getDatasourcesLatch = new CountDownLatch(1); + getDatasourcesLatch.await(1, TimeUnit.SECONDS); + + fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.isJoinable()); + Assert.assertTrue(fooTable.isBroadcast()); + + // now remove it + joinableDataSourceNames.remove("foo"); + segmentDataSourceNames.remove("foo"); + serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER); + // wait for build buildTableLatch.await(1, TimeUnit.SECONDS); buildTableLatch = new CountDownLatch(1); @@ -496,10 +550,59 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce fooTable = (DruidTable) schema.getTableMap().get("foo"); Assert.assertNotNull(fooTable); Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); - Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.isJoinable()); + Assert.assertFalse(fooTable.isBroadcast()); + } + + @Test + public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException + { + DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.isJoinable()); + Assert.assertFalse(fooTable.isBroadcast()); + + // wait for build twice + buildTableLatch.await(1, TimeUnit.SECONDS); + + final DataSegment someNewBrokerSegment = new DataSegment( + "foo", + Intervals.of("2012/2013"), + "version1", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + new NumberedShardSpec(2, 3), + null, + 1, + 100L, + PruneSpecsHolder.DEFAULT + ); + segmentDataSourceNames.add("foo"); + serverView.addSegment(someNewBrokerSegment, ServerType.BROKER); + + buildTableLatch = new CountDownLatch(2); + buildTableLatch.await(1, TimeUnit.SECONDS); + + // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) + getDatasourcesLatch = new CountDownLatch(1); + getDatasourcesLatch.await(1, TimeUnit.SECONDS); + + fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + // should not be a GlobalTableDataSource for now, because isGlobal is couple with joinability. idealy this will be + // changed in the future and we should expect + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.isBroadcast()); + Assert.assertFalse(fooTable.isJoinable()); + // now remove it - dataSourceNames.remove("foo"); + segmentDataSourceNames.remove("foo"); serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER); // wait for build @@ -515,6 +618,7 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce Assert.assertNotNull(fooTable); Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.isBroadcast()); + Assert.assertFalse(fooTable.isJoinable()); } - } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 3e1356af7b98..ace7acaa17f8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -67,6 +67,7 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; @@ -242,6 +243,7 @@ public Authorizer getAuthorizer(String name) CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments(), realtimeSegments), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), + new MapJoinableFactory(ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index b7d56f5632d2..ca683a243aea 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -73,6 +73,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; @@ -979,6 +980,7 @@ private static DruidSchema createMockSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments()), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), + new MapJoinableFactory(ImmutableMap.of()), plannerConfig, viewManager, TEST_AUTHENTICATOR_ESCALATOR diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 1da0fbabf9c9..0490420cdd73 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -120,6 +120,7 @@ public SpecificSegmentsQuerySegmentWalker( scheduler ), conglomerate, + joinableFactoryToUse, new ServerConfig() ); }