From 72ccc08de10b096bf8e148e2a1fbb48ca48bf810 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 17 Jun 2020 04:06:45 -0700 Subject: [PATCH 1/7] global table if only joinable --- .../movingaverage/MovingAverageQueryTest.java | 3 + .../druid/segment/join/JoinableFactory.java | 7 ++ .../segment/join/MapJoinableFactory.java | 11 +++ .../segment/join/NoopJoinableFactory.java | 6 ++ .../segment/join/InlineJoinableFactory.java | 8 +++ .../segment/join/LookupJoinableFactory.java | 6 ++ .../server/ClientQuerySegmentWalker.java | 47 +++++++++++-- .../server/ClientQuerySegmentWalkerTest.java | 57 ++++++++++++++- .../apache/druid/server/QueryStackTests.java | 2 + .../druid/sql/calcite/schema/DruidSchema.java | 21 ++++-- .../sql/calcite/schema/InformationSchema.java | 33 +++++---- .../druid/sql/calcite/CalciteQueryTest.java | 70 +++++++++---------- .../schema/DruidCalciteSchemaModuleTest.java | 4 ++ .../schema/DruidSchemaNoDataInitTest.java | 2 + .../sql/calcite/schema/DruidSchemaTest.java | 39 +++++++++-- .../sql/calcite/schema/SystemSchemaTest.java | 2 + .../druid/sql/calcite/util/CalciteTests.java | 2 + .../SpecificSegmentsQuerySegmentWalker.java | 1 + 18 files changed, 257 insertions(+), 64 deletions(-) diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 971f8ed8b264..23dc65f5235c 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Injector; import com.google.inject.Module; @@ -64,6 +65,7 @@ import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.ClientQuerySegmentWalker; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.initialization.ServerConfig; @@ -377,6 +379,7 @@ public void emit(Event event) baseClient, null /* local client; unused in this test, so pass in null */, warehouse, + new MapJoinableFactory(ImmutableMap.of()), retryConfig, jsonMapper, serverConfig, diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java index fc63f1cfbee3..723aba57faa9 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java @@ -30,6 +30,13 @@ */ public interface JoinableFactory { + /** + * Returns true if a {@link Joinable} **may** be created for a given {@link DataSource}, but is not a guarantee that + * {@link #build} will return a non-empty result. Successfully building a {@link Joinable} might require specific + * criteria of the {@link JoinConditionAnalysis}. + */ + boolean isDirectlyJoinable(DataSource dataSource); + /** * Create a Joinable object. This may be an expensive operation involving loading data, creating a hash table, etc. * diff --git a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java index beb8106225d4..abf4b6ae4d06 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java @@ -43,6 +43,17 @@ public MapJoinableFactory(Map, JoinableFactory> join this.joinableFactories = new IdentityHashMap<>(joinableFactories); } + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + JoinableFactory factory = joinableFactories.get(dataSource.getClass()); + if (factory == null) { + return false; + } else { + return factory.isDirectlyJoinable(dataSource); + } + } + @Override public Optional build(DataSource dataSource, JoinConditionAnalysis condition) { diff --git a/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java index ff138041f131..1583b02b5d5d 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java +++ b/processing/src/test/java/org/apache/druid/segment/join/NoopJoinableFactory.java @@ -32,6 +32,12 @@ private NoopJoinableFactory() // Singleton. } + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return false; + } + @Override public Optional build(DataSource dataSource, JoinConditionAnalysis condition) { diff --git a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java index 5945d42957cf..720c20238cf8 100644 --- a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java +++ b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java @@ -36,6 +36,14 @@ */ public class InlineJoinableFactory implements JoinableFactory { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + // i don't believe this should ever be legitimately called, because this method is used to avoid subquery joins + // which use the InlineJoinableFactory + return true; + } + @Override public Optional build(final DataSource dataSource, final JoinConditionAnalysis condition) { diff --git a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java index a6fd209b1d12..c7b0bb4d551c 100644 --- a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java +++ b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java @@ -42,6 +42,12 @@ public LookupJoinableFactory(LookupExtractorFactoryContainerProvider lookupProvi this.lookupProvider = lookupProvider; } + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return true; + } + @Override public Optional build(final DataSource dataSource, final JoinConditionAnalysis condition) { diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index fa35ff7b8a9c..5f284f720c6a 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -32,6 +32,7 @@ import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.query.DataSource; import org.apache.druid.query.FluentQueryRunnerBuilder; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.PostProcessingOperator; import org.apache.druid.query.Query; @@ -47,9 +48,11 @@ import org.apache.druid.query.RetryQueryRunner; import org.apache.druid.query.RetryQueryRunnerConfig; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.initialization.ServerConfig; import org.joda.time.Interval; @@ -77,6 +80,7 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker private final QuerySegmentWalker clusterClient; private final QuerySegmentWalker localClient; private final QueryToolChestWarehouse warehouse; + private final JoinableFactory joinableFactory; private final RetryQueryRunnerConfig retryConfig; private final ObjectMapper objectMapper; private final ServerConfig serverConfig; @@ -88,6 +92,7 @@ public ClientQuerySegmentWalker( QuerySegmentWalker clusterClient, QuerySegmentWalker localClient, QueryToolChestWarehouse warehouse, + JoinableFactory joinableFactory, RetryQueryRunnerConfig retryConfig, ObjectMapper objectMapper, ServerConfig serverConfig, @@ -99,6 +104,7 @@ public ClientQuerySegmentWalker( this.clusterClient = clusterClient; this.localClient = localClient; this.warehouse = warehouse; + this.joinableFactory = joinableFactory; this.retryConfig = retryConfig; this.objectMapper = objectMapper; this.serverConfig = serverConfig; @@ -112,6 +118,7 @@ public ClientQuerySegmentWalker( CachingClusteredClient clusterClient, LocalQuerySegmentWalker localClient, QueryToolChestWarehouse warehouse, + JoinableFactory joinableFactory, RetryQueryRunnerConfig retryConfig, ObjectMapper objectMapper, ServerConfig serverConfig, @@ -124,6 +131,7 @@ public ClientQuerySegmentWalker( (QuerySegmentWalker) clusterClient, (QuerySegmentWalker) localClient, warehouse, + joinableFactory, retryConfig, objectMapper, serverConfig, @@ -137,10 +145,13 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable> toolChest = warehouse.getToolChest(query); - // First, do an inlining dry run to see if any inlining is necessary, without actually running the queries. + // transform TableDataSource to GlobalTableDataSource when eligible + // before further transformation to potentially inline + final DataSource freeTradeDataSource = globalizeIfPossible(query.getDataSource()); + // do an inlining dry run to see if any inlining is necessary, without actually running the queries. final int maxSubqueryRows = QueryContexts.getMaxSubqueryRows(query, serverConfig.getMaxSubqueryRows()); final DataSource inlineDryRun = inlineIfNecessary( - query.getDataSource(), + freeTradeDataSource, toolChest, new AtomicInteger(), maxSubqueryRows, @@ -156,7 +167,7 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable newQuery = query.withDataSource( inlineIfNecessary( - query.getDataSource(), + freeTradeDataSource, toolChest, new AtomicInteger(), maxSubqueryRows, @@ -187,10 +198,15 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable QueryRunner getQueryRunnerForSegments(Query query, Iterable specs) { - // Inlining isn't done for segments-based queries. + // Inlining isn't done for segments-based queries, but we still globalify the table datasources if possible + final Query freeTradeQuery = query.withDataSource(globalizeIfPossible(query.getDataSource())); if (canRunQueryUsingClusterWalker(query)) { - return decorateClusterRunner(query, clusterClient.getQueryRunnerForSegments(query, specs)); + return new QuerySwappingQueryRunner<>( + decorateClusterRunner(freeTradeQuery, clusterClient.getQueryRunnerForSegments(query, specs)), + query, + freeTradeQuery + ); } else { // We don't expect end-users to see this message, since it only happens when specific segments are requested; // this is not typical end-user behavior. @@ -235,6 +251,27 @@ private boolean canRunQueryUsingClusterWalker(Query query) || toolChest.canPerformSubquery(((QueryDataSource) analysis.getDataSource()).getQuery())); } + + private DataSource globalizeIfPossible( + final DataSource dataSource + ) + { + if (dataSource instanceof TableDataSource) { + GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(((TableDataSource) dataSource).getName()); + if (joinableFactory.isDirectlyJoinable(maybeGlobal)) { + return maybeGlobal; + } + return dataSource; + } else { + List currentChildren = dataSource.getChildren(); + List newChildren = new ArrayList<>(currentChildren.size()); + for (DataSource child : currentChildren) { + newChildren.add(globalizeIfPossible(child)); + } + return dataSource.withChildren(newChildren); + } + } + /** * Replace QueryDataSources with InlineDataSources when necessary and possible. "Necessary" is defined as: * diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index f224b5f580e4..565fee9fddd9 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -33,6 +33,7 @@ import org.apache.druid.query.BaseQuery; import org.apache.druid.query.DataSource; import org.apache.druid.query.Druids; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.JoinDataSource; import org.apache.druid.query.Query; @@ -70,7 +71,9 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.join.InlineJoinableFactory; +import org.apache.druid.segment.join.JoinConditionAnalysis; import org.apache.druid.segment.join.JoinType; +import org.apache.druid.segment.join.Joinable; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; @@ -96,6 +99,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** * Tests ClientQuerySegmentWalker. @@ -112,6 +116,7 @@ public class ClientQuerySegmentWalkerTest private static final String FOO = "foo"; private static final String BAR = "bar"; private static final String MULTI = "multi"; + private static final String GLOBAL = "broadcast"; private static final Interval INTERVAL = Intervals.of("2000/P1Y"); private static final String VERSION = "A"; @@ -218,6 +223,40 @@ public void testTimeseriesOnTable() Assert.assertEquals(1, scheduler.getTotalReleased().get()); } + @Test + public void testTimeseriesOnAutomaticGlobalTable() + { + final TimeseriesQuery query = + Druids.newTimeseriesQueryBuilder() + .dataSource(GLOBAL) + .granularity(Granularities.ALL) + .intervals(Collections.singletonList(INTERVAL)) + .aggregators(new LongSumAggregatorFactory("sum", "n")) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) + .build(); + + // expect global/joinable datasource to be automatically translated into a GlobalTableDataSource + final TimeseriesQuery expectedClusterQuery = + Druids.newTimeseriesQueryBuilder() + .dataSource(new GlobalTableDataSource(GLOBAL)) + .granularity(Granularities.ALL) + .intervals(Collections.singletonList(INTERVAL)) + .aggregators(new LongSumAggregatorFactory("sum", "n")) + .context(ImmutableMap.of(TimeseriesQuery.CTX_GRAND_TOTAL, false)) + .build(); + + testQuery( + query, + ImmutableList.of(ExpectedQuery.cluster(expectedClusterQuery)), + ImmutableList.of(new Object[]{INTERVAL.getStartMillis(), 10L}) + ); + + Assert.assertEquals(1, scheduler.getTotalRun().get()); + Assert.assertEquals(1, scheduler.getTotalPrioritizedAndLaned().get()); + Assert.assertEquals(1, scheduler.getTotalAcquired().get()); + Assert.assertEquals(1, scheduler.getTotalReleased().get()); + } + @Test public void testTimeseriesOnInline() { @@ -606,6 +645,20 @@ private void initWalker(final Map serverProperties, QuerySchedul final JoinableFactory joinableFactory = new MapJoinableFactory( ImmutableMap., JoinableFactory>builder() .put(InlineDataSource.class, new InlineJoinableFactory()) + .put(GlobalTableDataSource.class, new JoinableFactory() + { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return ((GlobalTableDataSource) dataSource).getName().equals(GLOBAL); + } + + @Override + public Optional build(DataSource dataSource, JoinConditionAnalysis condition) + { + return Optional.empty(); + } + }) .build() ); @@ -651,7 +704,8 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable> QueryToolChest getToolChest return conglomerate.findFactory(query).getToolchest(); } }, + joinableFactory, new RetryQueryRunnerConfig(), TestHelper.makeJsonMapper(), serverConfig, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 6762aaf13c94..582be76481be 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -56,6 +56,7 @@ import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -104,6 +105,7 @@ public class DruidSchema extends AbstractSchema private final PlannerConfig config; private final SegmentManager segmentManager; private final ViewManager viewManager; + private final JoinableFactory joinableFactory; private final ExecutorService cacheExec; private final ConcurrentMap tables; @@ -148,6 +150,7 @@ public DruidSchema( final QueryLifecycleFactory queryLifecycleFactory, final TimelineServerView serverView, final SegmentManager segmentManager, + final JoinableFactory joinableFactory, final PlannerConfig config, final ViewManager viewManager, final Escalator escalator @@ -156,6 +159,7 @@ public DruidSchema( this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); Preconditions.checkNotNull(serverView, "serverView"); this.segmentManager = segmentManager; + this.joinableFactory = joinableFactory; this.config = Preconditions.checkNotNull(config, "config"); this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager"); this.cacheExec = Execs.singleThreaded("DruidSchema-Cache-%d"); @@ -278,10 +282,11 @@ public void start() throws InterruptedException for (String dataSource : dataSourcesToRebuild) { final DruidTable druidTable = buildDruidTable(dataSource); final DruidTable oldTable = tables.put(dataSource, druidTable); + final String description = druidTable.getDataSource().isGlobal() ? "global dataSource" : "dataSource"; if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { - log.info("dataSource [%s] has new signature: %s.", dataSource, druidTable.getRowSignature()); + log.info("%s [%s] has new signature: %s.", description, dataSource, druidTable.getRowSignature()); } else { - log.debug("dataSource [%s] signature is unchanged.", dataSource); + log.debug("%s [%s] signature is unchanged.", description, dataSource); } } @@ -627,8 +632,16 @@ protected DruidTable buildDruidTable(final String dataSource) columnTypes.forEach(builder::add); final TableDataSource tableDataSource; - if (segmentManager.getDataSourceNames().contains(dataSource)) { - tableDataSource = new GlobalTableDataSource(dataSource); + + // to be a GlobalTableDataSource instead of a TableDataSource, it must both: + // * appear on all servers (inferred by existing in the segment cache, which in this case belongs to the broker + // meaning only broadcast segments live here) + // * be joinable, join analysis checks this, 'global' datasources are distributed everywhere and may be joined + // against in an optimized manner - if the implementation exists. + // if neither of these criteria are true, assume a regular TableDataSource, backed by distributed Druid segments + final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource); + if (segmentManager.getDataSourceNames().contains(dataSource) && joinableFactory.isDirectlyJoinable(maybeGlobal)) { + tableDataSource = maybeGlobal; } else { tableDataSource = new TableDataSource(dataSource); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java index bf84ea131c8c..106ed86067dd 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java @@ -53,6 +53,7 @@ import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.server.security.ResourceAction; import org.apache.druid.sql.calcite.planner.PlannerContext; +import org.apache.druid.sql.calcite.table.DruidTable; import org.apache.druid.sql.calcite.table.RowSignatures; import javax.annotation.Nullable; @@ -83,6 +84,7 @@ public class InformationSchema extends AbstractSchema .add("TABLE_SCHEMA", ValueType.STRING) .add("TABLE_NAME", ValueType.STRING) .add("TABLE_TYPE", ValueType.STRING) + .add("TABLE_AVAILABILITY", ValueType.STRING) .build(); private static final RowSignature COLUMNS_SIGNATURE = RowSignature .builder() @@ -217,18 +219,24 @@ public Iterable apply(final String schemaName) return Iterables.filter( Iterables.concat( FluentIterable.from(authorizedTableNames).transform( - new Function() - { - @Override - public Object[] apply(final String tableName) - { - return new Object[]{ - CATALOG_NAME, // TABLE_CATALOG - schemaName, // TABLE_SCHEMA - tableName, // TABLE_NAME - subSchema.getTable(tableName).getJdbcTableType().toString() // TABLE_TYPE - }; + tableName -> { + final Table table = subSchema.getTable(tableName); + final String availability; + if (table instanceof DruidTable) { + availability = ((DruidTable) table).getDataSource().isGlobal() + ? "GLOBAL" + : "DISTRIBUTED"; + } else { + availability = "LOCAL"; } + + return new Object[]{ + CATALOG_NAME, // TABLE_CATALOG + schemaName, // TABLE_SCHEMA + tableName, // TABLE_NAME + table.getJdbcTableType().toString(), // TABLE_TYPE + availability // TABLE_AVAILABILITY + }; } ), FluentIterable.from(authorizedFunctionNames).transform( @@ -242,7 +250,8 @@ public Object[] apply(final String functionName) CATALOG_NAME, // TABLE_CATALOG schemaName, // TABLE_SCHEMA functionName, // TABLE_NAME - "VIEW" // TABLE_TYPE + "VIEW", // TABLE_TYPE + "VIRTUAL" // TABLE_AVAILABILITY }; } else { return null; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 9b391a716f1b..4c92d89f5411 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -708,55 +708,55 @@ public void testInformationSchemaSchemata() throws Exception public void testInformationSchemaTables() throws Exception { testQuery( - "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n" + "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_AVAILABILITY\n" + "FROM INFORMATION_SCHEMA.TABLES\n" + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", ImmutableList.of(), ImmutableList.builder() - .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"}) - .add(new Object[]{"druid", "aview", "VIEW"}) - .add(new Object[]{"druid", "bview", "VIEW"}) - .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}) - .add(new Object[]{"lookup", "lookyloo", "TABLE"}) - .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", "aview", "VIEW", "VIRTUAL"}) + .add(new Object[]{"druid", "bview", "VIEW", "VIRTUAL"}) + .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"lookup", "lookyloo", "TABLE", "GLOBAL"}) + .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "LOCAL"}) .build() ); testQuery( PLANNER_CONFIG_DEFAULT, - "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE\n" + "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_AVAILABILITY\n" + "FROM INFORMATION_SCHEMA.TABLES\n" + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", CalciteTests.SUPER_USER_AUTH_RESULT, ImmutableList.of(), ImmutableList.builder() - .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE"}) - .add(new Object[]{"druid", "aview", "VIEW"}) - .add(new Object[]{"druid", "bview", "VIEW"}) - .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE"}) - .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE"}) - .add(new Object[]{"lookup", "lookyloo", "TABLE"}) - .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) - .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "DISTRIBUTED"}) + .add(new Object[]{"druid", "aview", "VIEW", "VIRTUAL"}) + .add(new Object[]{"druid", "bview", "VIEW", "VIRTUAL"}) + .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"lookup", "lookyloo", "TABLE", "GLOBAL"}) + .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "LOCAL"}) .build() ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java index 11c2b97f4244..a5e883176039 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite.schema; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Guice; import com.google.inject.Injector; @@ -38,6 +39,8 @@ import org.apache.druid.guice.LifecycleModule; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; import org.apache.druid.query.lookup.LookupReferencesManager; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.security.AuthorizerMapper; @@ -102,6 +105,7 @@ public void setUp() binder -> { binder.bind(QueryLifecycleFactory.class).toInstance(queryLifecycleFactory); binder.bind(TimelineServerView.class).toInstance(serverView); + binder.bind(JoinableFactory.class).toInstance(new MapJoinableFactory(ImmutableMap.of())); binder.bind(PlannerConfig.class).toInstance(plannerConfig); binder.bind(ViewManager.class).toInstance(viewManager); binder.bind(Escalator.class).toInstance(escalator); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index fd20fb594e85..0ba26ea9d3ea 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.SegmentManager; @@ -54,6 +55,7 @@ public void testInitializationWithNoData() throws Exception ), new TestServerInventoryView(Collections.emptyList()), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), + new MapJoinableFactory(ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 8455965ef5e9..c8e6679af5ff 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.DataSource; import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.query.TableDataSource; @@ -43,6 +44,10 @@ import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.Joinable; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryStackTests; @@ -77,6 +82,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -132,12 +138,12 @@ public static void tearDownClass() throws IOException private SpecificSegmentsQuerySegmentWalker walker = null; private DruidSchema schema = null; private SegmentManager segmentManager; - private Set dataSourceNames; + private Set segmentDataSourceNames; @Before public void setUp() throws Exception { - dataSourceNames = Sets.newConcurrentHashSet(); + segmentDataSourceNames = Sets.newConcurrentHashSet(); final File tmpDir = temporaryFolder.newFolder(); final QueryableIndex index1 = IndexBuilder.create() .tmpDir(new File(tmpDir, "1")) @@ -173,7 +179,7 @@ public void setUp() throws Exception public Set getDataSourceNames() { getDatasourcesLatch.countDown(); - return dataSourceNames; + return segmentDataSourceNames; } }; @@ -222,10 +228,29 @@ public Set getDataSourceNames() serverView = new TestServerInventoryView(walker.getSegments(), realtimeSegments); druidServers = serverView.getDruidServers(); + final JoinableFactory globalTableJoinable = new JoinableFactory() + { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return dataSource instanceof GlobalTableDataSource && + segmentDataSourceNames.contains(((GlobalTableDataSource) dataSource).getName()); + } + + @Override + public Optional build( + DataSource dataSource, JoinConditionAnalysis condition + ) + { + return Optional.empty(); + } + }; + schema = new DruidSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, + new MapJoinableFactory(ImmutableMap.of(GlobalTableDataSource.class, globalTableJoinable)), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() @@ -481,12 +506,12 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce 100L, PruneSpecsHolder.DEFAULT ); - dataSourceNames.add("foo"); + segmentDataSourceNames.add("foo"); serverView.addSegment(someNewBrokerSegment, ServerType.BROKER); - // wait for build + // wait for build twice buildTableLatch.await(1, TimeUnit.SECONDS); - buildTableLatch = new CountDownLatch(1); + buildTableLatch = new CountDownLatch(2); buildTableLatch.await(1, TimeUnit.SECONDS); // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) @@ -499,7 +524,7 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); // now remove it - dataSourceNames.remove("foo"); + segmentDataSourceNames.remove("foo"); serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER); // wait for build diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 3e1356af7b98..ace7acaa17f8 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -67,6 +67,7 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; @@ -242,6 +243,7 @@ public Authorizer getAuthorizer(String name) CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments(), realtimeSegments), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), + new MapJoinableFactory(ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 0ca415b8746a..1d23d430ff75 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -73,6 +73,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.DruidNode; @@ -870,6 +871,7 @@ private static DruidSchema createMockSchema( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments()), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), + new MapJoinableFactory(ImmutableMap.of()), plannerConfig, viewManager, TEST_AUTHENTICATOR_ESCALATOR diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java index 1da0fbabf9c9..0490420cdd73 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SpecificSegmentsQuerySegmentWalker.java @@ -120,6 +120,7 @@ public SpecificSegmentsQuerySegmentWalker( scheduler ), conglomerate, + joinableFactoryToUse, new ServerConfig() ); } From 01cb2df2999650c0a599d201e15023719fa01be9 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 17 Jun 2020 04:53:48 -0700 Subject: [PATCH 2/7] oops --- .../druid/segment/join/JoinablesTest.java | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java index ae36d175ff16..b9ee1595dd6d 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.IAE; import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.DataSource; import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.extraction.MapLookupExtractor; @@ -155,13 +156,26 @@ public void test_createSegmentMapFn_usableClause() final Function segmentMapFn = Joinables.createSegmentMapFn( ImmutableList.of(clause), - (dataSource, condition) -> { - if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) { - return Optional.of( - LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false)) - ); - } else { - return Optional.empty(); + new JoinableFactory() + { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return dataSource.equals(lookupDataSource); + } + + @Override + public Optional build( + DataSource dataSource, JoinConditionAnalysis condition + ) + { + if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) { + return Optional.of( + LookupJoinable.wrap(new MapLookupExtractor(ImmutableMap.of("k", "v"), false)) + ); + } else { + return Optional.empty(); + } } }, new AtomicLong(), From 0f50b3ecfe89743cc23abf4bddbf981a0c2ff3af Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 17 Jun 2020 12:34:10 -0700 Subject: [PATCH 3/7] fix style, add more tests --- .../apache/druid/segment/join/JoinablesTest.java | 4 +--- .../segment/join/MapJoinableFactoryTest.java | 16 ++++++++++++++++ .../segment/join/InlineJoinableFactory.java | 5 +++-- .../segment/join/LookupJoinableFactory.java | 3 ++- .../segment/join/InlineJoinableFactoryTest.java | 7 +++++++ .../segment/join/LookupJoinableFactoryTest.java | 7 +++++++ 6 files changed, 36 insertions(+), 6 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java index b9ee1595dd6d..f7c4f4b415b8 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/JoinablesTest.java @@ -165,9 +165,7 @@ public boolean isDirectlyJoinable(DataSource dataSource) } @Override - public Optional build( - DataSource dataSource, JoinConditionAnalysis condition - ) + public Optional build(DataSource dataSource, JoinConditionAnalysis condition) { if (dataSource.equals(lookupDataSource) && condition.equals(conditionAnalysis)) { return Optional.of( diff --git a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java index cf0336061409..1d00e711c190 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java @@ -65,6 +65,8 @@ public void setUp() target = new MapJoinableFactory( ImmutableMap.of(NoopDataSource.class, noopJoinableFactory)); } + + @Test public void testBuildDataSourceNotRegisteredShouldReturnAbsent() { @@ -89,4 +91,18 @@ public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory() Optional joinable = target.build(noopDataSource, condition); Assert.assertEquals(mockJoinable, joinable.get()); } + + @Test + public void testIsDirectShouldBeFalseForNotRegistered() + { + Assert.assertFalse(target.isDirectlyJoinable(inlineDataSource)); + } + + @Test + public void testIsDirectlyJoinableShouldBeTrueForRegisteredThatIsJoinable() + { + EasyMock.expect(noopJoinableFactory.isDirectlyJoinable(noopDataSource)).andReturn(true).anyTimes(); + EasyMock.replay(noopJoinableFactory); + Assert.assertTrue(target.isDirectlyJoinable(noopDataSource)); + } } diff --git a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java index 720c20238cf8..4eee53fde5dc 100644 --- a/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java +++ b/server/src/main/java/org/apache/druid/segment/join/InlineJoinableFactory.java @@ -39,9 +39,10 @@ public class InlineJoinableFactory implements JoinableFactory @Override public boolean isDirectlyJoinable(DataSource dataSource) { - // i don't believe this should ever be legitimately called, because this method is used to avoid subquery joins + // this should always be true if this is access through MapJoinableFactory, but check just in case... + // further, this should not ever be legitimately called, because this method is used to avoid subquery joins // which use the InlineJoinableFactory - return true; + return dataSource instanceof InlineDataSource; } @Override diff --git a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java index c7b0bb4d551c..2dab0a616639 100644 --- a/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java +++ b/server/src/main/java/org/apache/druid/segment/join/LookupJoinableFactory.java @@ -45,7 +45,8 @@ public LookupJoinableFactory(LookupExtractorFactoryContainerProvider lookupProvi @Override public boolean isDirectlyJoinable(DataSource dataSource) { - return true; + // this should always be true if this is access through MapJoinableFactory, but check just in case... + return dataSource instanceof LookupDataSource; } @Override diff --git a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java index d1be69840430..2a5bf3e5c7ce 100644 --- a/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/join/InlineJoinableFactoryTest.java @@ -80,6 +80,13 @@ public void testBuild() Assert.assertEquals(3, joinable.getCardinality("long")); } + @Test + public void testIsDirectlyJoinable() + { + Assert.assertTrue(factory.isDirectlyJoinable(inlineDataSource)); + Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo"))); + } + private static JoinConditionAnalysis makeCondition(final String condition) { return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil()); diff --git a/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java b/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java index 44ed4b2810af..6e0e737db9ed 100644 --- a/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/join/LookupJoinableFactoryTest.java @@ -125,6 +125,13 @@ public void testBuild() Assert.assertEquals(Joinable.CARDINALITY_UNKNOWN, joinable.getCardinality("v")); } + @Test + public void testIsDirectlyJoinable() + { + Assert.assertTrue(factory.isDirectlyJoinable(lookupDataSource)); + Assert.assertFalse(factory.isDirectlyJoinable(new TableDataSource("foo"))); + } + private static JoinConditionAnalysis makeCondition(final String condition) { return JoinConditionAnalysis.forExpression(condition, PREFIX, ExprMacroTable.nil()); From 2ad4ba4c56d2fcfc956dd13989f6c471e85bbe0b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 17 Jun 2020 13:47:18 -0700 Subject: [PATCH 4/7] Update sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java --- .../org/apache/druid/sql/calcite/schema/DruidSchemaTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index c8e6679af5ff..438b985b7356 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -239,7 +239,8 @@ public boolean isDirectlyJoinable(DataSource dataSource) @Override public Optional build( - DataSource dataSource, JoinConditionAnalysis condition + DataSource dataSource, + JoinConditionAnalysis condition ) { return Optional.empty(); From cce3fcef864bdef2da9c3e219fbd57e5f98639ab Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 17 Jun 2020 23:30:28 -0700 Subject: [PATCH 5/7] better information schema columns, distinguish broadcast from joinable --- .../org/apache/druid/query/DataSource.java | 9 ++ .../query/planning/DataSourceAnalysis.java | 2 +- .../sql/calcite/rel/DruidJoinQueryRel.java | 2 + .../druid/sql/calcite/schema/DruidSchema.java | 17 ++-- .../sql/calcite/schema/InformationSchema.java | 26 ++++-- .../sql/calcite/schema/LookupSchema.java | 4 +- .../druid/sql/calcite/table/DruidTable.java | 18 +++- .../druid/sql/calcite/CalciteQueryTest.java | 70 ++++++++-------- .../sql/calcite/schema/DruidSchemaTest.java | 84 ++++++++++++++++++- 9 files changed, 174 insertions(+), 58 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index c3edd1c79647..c376a553bb87 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -71,6 +71,15 @@ public interface DataSource /** * Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or * for queries of those. + * + * Currently this is couple with joinability - if this returns true then the query engine expects there exists a + * {@link org.apache.druid.segment.join.JoinableFactory} which might build a + * {@link org.apache.druid.segment.join.Joinable} for this datasource directly. If a subquery 'inline' join is + * required to join this datasource on the right hand side, then this value must be false for now. + * + * In the future, instead of directly using this method, the query planner and engine should consider + * {@link org.apache.druid.segment.join.JoinableFactory#isDirectlyJoinable(DataSource)} when determining if the + * right hand side is directly joinable, which would allow decoupling this property from joins. */ boolean isGlobal(); diff --git a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java index 4237e50dc473..5b34f7623fed 100644 --- a/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java +++ b/processing/src/main/java/org/apache/druid/query/planning/DataSourceAnalysis.java @@ -205,7 +205,7 @@ public List getPreJoinableClauses() /** * Returns true if all servers have the ability to compute this datasource. These datasources depend only on - * globally broadcast data, like lookups or inline data. + * globally broadcast data, like lookups or inline data or broadcast segments. */ public boolean isGlobal() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java index 1655d73e5445..6faf2198627a 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidJoinQueryRel.java @@ -337,6 +337,8 @@ private static boolean computeLeftRequiresSubquery(final DruidRel left) private static boolean computeRightRequiresSubquery(final DruidRel right) { // Right requires a subquery unless it's a scan or mapping on top of a global datasource. + // ideally this would involve JoinableFactory.isDirectlyJoinable to check that the global datasources + // are in fact possibly joinable, but for now isGlobal is coupled to joinability return !(DruidRels.isScanOrMapping(right, false) && DruidRels.dataSourceIfLeafRel(right).filter(DataSource::isGlobal).isPresent()); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 582be76481be..b264749308c6 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -633,19 +633,20 @@ protected DruidTable buildDruidTable(final String dataSource) final TableDataSource tableDataSource; - // to be a GlobalTableDataSource instead of a TableDataSource, it must both: - // * appear on all servers (inferred by existing in the segment cache, which in this case belongs to the broker - // meaning only broadcast segments live here) - // * be joinable, join analysis checks this, 'global' datasources are distributed everywhere and may be joined - // against in an optimized manner - if the implementation exists. - // if neither of these criteria are true, assume a regular TableDataSource, backed by distributed Druid segments + // to be a GlobalTableDataSource instead of a TableDataSource, it must appear on all servers (inferred by existing + // in the segment cache, which in this case belongs to the broker meaning only broadcast segments live here) + // to be joinable, it must be possibly joinable according to the factory. we only consider broadcast datasources + // at this time, and isGlobal is currently strongly coupled with joinable, so only make a global table datasource + // if also joinable final GlobalTableDataSource maybeGlobal = new GlobalTableDataSource(dataSource); - if (segmentManager.getDataSourceNames().contains(dataSource) && joinableFactory.isDirectlyJoinable(maybeGlobal)) { + final boolean isJoinable = joinableFactory.isDirectlyJoinable(maybeGlobal); + final boolean isBroadcast = segmentManager.getDataSourceNames().contains(dataSource); + if (isBroadcast && isJoinable) { tableDataSource = maybeGlobal; } else { tableDataSource = new TableDataSource(dataSource); } - return new DruidTable(tableDataSource, builder.build()); + return new DruidTable(tableDataSource, builder.build(), isJoinable, isBroadcast); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java index 106ed86067dd..8ee93a21e631 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/InformationSchema.java @@ -84,7 +84,8 @@ public class InformationSchema extends AbstractSchema .add("TABLE_SCHEMA", ValueType.STRING) .add("TABLE_NAME", ValueType.STRING) .add("TABLE_TYPE", ValueType.STRING) - .add("TABLE_AVAILABILITY", ValueType.STRING) + .add("IS_JOINABLE", ValueType.STRING) + .add("IS_BROADCAST", ValueType.STRING) .build(); private static final RowSignature COLUMNS_SIGNATURE = RowSignature .builder() @@ -111,6 +112,9 @@ public class InformationSchema extends AbstractSchema return Collections.singletonList(AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(datasourceName)); }; + private static final String INFO_TRUE = "YES"; + private static final String INFO_FALSE = "NO"; + private final SchemaPlus rootSchema; private final Map tableMap; private final AuthorizerMapper authorizerMapper; @@ -221,13 +225,15 @@ public Iterable apply(final String schemaName) FluentIterable.from(authorizedTableNames).transform( tableName -> { final Table table = subSchema.getTable(tableName); - final String availability; + final boolean isJoinable; + final boolean isBroadcast; if (table instanceof DruidTable) { - availability = ((DruidTable) table).getDataSource().isGlobal() - ? "GLOBAL" - : "DISTRIBUTED"; + DruidTable druidTable = (DruidTable) table; + isJoinable = druidTable.isJoinable(); + isBroadcast = druidTable.isBroadcast(); } else { - availability = "LOCAL"; + isJoinable = false; + isBroadcast = false; } return new Object[]{ @@ -235,7 +241,8 @@ public Iterable apply(final String schemaName) schemaName, // TABLE_SCHEMA tableName, // TABLE_NAME table.getJdbcTableType().toString(), // TABLE_TYPE - availability // TABLE_AVAILABILITY + isJoinable ? INFO_TRUE : INFO_FALSE, // IS_JOINABLE + isBroadcast ? INFO_TRUE : INFO_FALSE // IS_BROADCAST }; } ), @@ -251,7 +258,8 @@ public Object[] apply(final String functionName) schemaName, // TABLE_SCHEMA functionName, // TABLE_NAME "VIEW", // TABLE_TYPE - "VIRTUAL" // TABLE_AVAILABILITY + INFO_FALSE, // IS_JOINABLE + INFO_FALSE // IS_BROADCAST }; } else { return null; @@ -415,7 +423,7 @@ public Object[] apply(final RelDataTypeField field) field.getName(), // COLUMN_NAME String.valueOf(field.getIndex()), // ORDINAL_POSITION "", // COLUMN_DEFAULT - type.isNullable() ? "YES" : "NO", // IS_NULLABLE + type.isNullable() ? INFO_TRUE : INFO_FALSE, // IS_NULLABLE type.getSqlTypeName().toString(), // DATA_TYPE null, // CHARACTER_MAXIMUM_LENGTH null, // CHARACTER_OCTET_LENGTH diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java index b3f331471791..6ddeaabb445d 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/LookupSchema.java @@ -57,7 +57,9 @@ protected Map getTableMap() final ImmutableMap.Builder tableMapBuilder = ImmutableMap.builder(); for (final String lookupName : lookupProvider.getAllLookupNames()) { - tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE)); + // all lookups should be also joinable through lookup joinable factory, and lookups are effectively broadcast + // (if we ignore lookup tiers...) + tableMapBuilder.put(lookupName, new DruidTable(new LookupDataSource(lookupName), ROW_SIGNATURE, true, true)); } return tableMapBuilder.build(); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java index 521a051b7c98..94da5ede16e5 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/table/DruidTable.java @@ -41,14 +41,20 @@ public class DruidTable implements TranslatableTable { private final DataSource dataSource; private final RowSignature rowSignature; + private final boolean joinable; + private final boolean broadcast; public DruidTable( final DataSource dataSource, - final RowSignature rowSignature + final RowSignature rowSignature, + final boolean isJoinable, + final boolean isBroadcast ) { this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); + this.joinable = isJoinable; + this.broadcast = isBroadcast; } public DataSource getDataSource() @@ -61,6 +67,16 @@ public RowSignature getRowSignature() return rowSignature; } + public boolean isJoinable() + { + return joinable; + } + + public boolean isBroadcast() + { + return broadcast; + } + @Override public Schema.TableType getJdbcTableType() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 4c92d89f5411..52a34be92f33 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -708,55 +708,55 @@ public void testInformationSchemaSchemata() throws Exception public void testInformationSchemaTables() throws Exception { testQuery( - "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_AVAILABILITY\n" + "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n" + "FROM INFORMATION_SCHEMA.TABLES\n" + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", ImmutableList.of(), ImmutableList.builder() - .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", "aview", "VIEW", "VIRTUAL"}) - .add(new Object[]{"druid", "bview", "VIEW", "VIRTUAL"}) - .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"lookup", "lookyloo", "TABLE", "GLOBAL"}) - .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"}) + .add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"}) + .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"}) .build() ); testQuery( PLANNER_CONFIG_DEFAULT, - "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, TABLE_AVAILABILITY\n" + "SELECT TABLE_SCHEMA, TABLE_NAME, TABLE_TYPE, IS_JOINABLE, IS_BROADCAST\n" + "FROM INFORMATION_SCHEMA.TABLES\n" + "WHERE TABLE_TYPE IN ('SYSTEM_TABLE', 'TABLE', 'VIEW')", CalciteTests.SUPER_USER_AUTH_RESULT, ImmutableList.of(), ImmutableList.builder() - .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "DISTRIBUTED"}) - .add(new Object[]{"druid", "aview", "VIEW", "VIRTUAL"}) - .add(new Object[]{"druid", "bview", "VIEW", "VIRTUAL"}) - .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"lookup", "lookyloo", "TABLE", "GLOBAL"}) - .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "LOCAL"}) - .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "LOCAL"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE1, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE2, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE4, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.FORBIDDEN_DATASOURCE, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE5, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", CalciteTests.DATASOURCE3, "TABLE", "NO", "NO"}) + .add(new Object[]{"druid", "aview", "VIEW", "NO", "NO"}) + .add(new Object[]{"druid", "bview", "VIEW", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "COLUMNS", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "SCHEMATA", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"INFORMATION_SCHEMA", "TABLES", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"lookup", "lookyloo", "TABLE", "YES", "YES"}) + .add(new Object[]{"sys", "segments", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "servers", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE", "NO", "NO"}) + .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE", "NO", "NO"}) .build() ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 438b985b7356..04c2e6637b74 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -139,11 +139,14 @@ public static void tearDownClass() throws IOException private DruidSchema schema = null; private SegmentManager segmentManager; private Set segmentDataSourceNames; + private Set joinableDataSourceNames; @Before public void setUp() throws Exception { segmentDataSourceNames = Sets.newConcurrentHashSet(); + joinableDataSourceNames = Sets.newConcurrentHashSet(); + final File tmpDir = temporaryFolder.newFolder(); final QueryableIndex index1 = IndexBuilder.create() .tmpDir(new File(tmpDir, "1")) @@ -234,7 +237,7 @@ public Set getDataSourceNames() public boolean isDirectlyJoinable(DataSource dataSource) { return dataSource instanceof GlobalTableDataSource && - segmentDataSourceNames.contains(((GlobalTableDataSource) dataSource).getName()); + joinableDataSourceNames.contains(((GlobalTableDataSource) dataSource).getName()); } @Override @@ -487,12 +490,16 @@ public void testAvailableSegmentMetadataIsRealtime() } @Test - public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedException + public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws InterruptedException { DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); Assert.assertNotNull(fooTable); Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.isJoinable()); + Assert.assertFalse(fooTable.isBroadcast()); + + buildTableLatch.await(1, TimeUnit.SECONDS); final DataSegment someNewBrokerSegment = new DataSegment( "foo", @@ -508,10 +515,10 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce PruneSpecsHolder.DEFAULT ); segmentDataSourceNames.add("foo"); + joinableDataSourceNames.add("foo"); serverView.addSegment(someNewBrokerSegment, ServerType.BROKER); // wait for build twice - buildTableLatch.await(1, TimeUnit.SECONDS); buildTableLatch = new CountDownLatch(2); buildTableLatch.await(1, TimeUnit.SECONDS); @@ -523,8 +530,11 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce Assert.assertNotNull(fooTable); Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertTrue(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.isJoinable()); + Assert.assertTrue(fooTable.isBroadcast()); // now remove it + joinableDataSourceNames.remove("foo"); segmentDataSourceNames.remove("foo"); serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER); @@ -541,6 +551,74 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobal() throws InterruptedExce Assert.assertNotNull(fooTable); Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.isJoinable()); + Assert.assertFalse(fooTable.isBroadcast()); } + @Test + public void testLocalSegmentCacheSetsDataSourceAsGlobalButNotJoinable() throws InterruptedException + { + DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.isJoinable()); + Assert.assertFalse(fooTable.isBroadcast()); + + // wait for build twice + buildTableLatch.await(1, TimeUnit.SECONDS); + + final DataSegment someNewBrokerSegment = new DataSegment( + "foo", + Intervals.of("2012/2013"), + "version1", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + new NumberedShardSpec(2, 3), + null, + 1, + 100L, + PruneSpecsHolder.DEFAULT + ); + segmentDataSourceNames.add("foo"); + serverView.addSegment(someNewBrokerSegment, ServerType.BROKER); + + buildTableLatch = new CountDownLatch(2); + buildTableLatch.await(1, TimeUnit.SECONDS); + + // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) + getDatasourcesLatch = new CountDownLatch(1); + getDatasourcesLatch.await(1, TimeUnit.SECONDS); + + fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + // should not be a GlobalTableDataSource for now, because isGlobal is couple with joinability. idealy this will be + // changed in the future and we should expect + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertTrue(fooTable.isBroadcast()); + Assert.assertFalse(fooTable.isJoinable()); + + + // now remove it + segmentDataSourceNames.remove("foo"); + serverView.removeSegment(someNewBrokerSegment, ServerType.BROKER); + + // wait for build + buildTableLatch.await(1, TimeUnit.SECONDS); + buildTableLatch = new CountDownLatch(1); + buildTableLatch.await(1, TimeUnit.SECONDS); + + // wait for get again, just to make sure table has been updated (latch counts down just before tables are updated) + getDatasourcesLatch = new CountDownLatch(1); + getDatasourcesLatch.await(1, TimeUnit.SECONDS); + + fooTable = (DruidTable) schema.getTableMap().get("foo"); + Assert.assertNotNull(fooTable); + Assert.assertTrue(fooTable.getDataSource() instanceof TableDataSource); + Assert.assertFalse(fooTable.getDataSource() instanceof GlobalTableDataSource); + Assert.assertFalse(fooTable.isBroadcast()); + Assert.assertFalse(fooTable.isJoinable()); + } } From 1f7876560565e320c022e809bdcc047c457f1a05 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 18 Jun 2020 00:15:02 -0700 Subject: [PATCH 6/7] fix javadoc --- processing/src/main/java/org/apache/druid/query/DataSource.java | 2 +- .../org/apache/druid/sql/calcite/schema/DruidSchemaTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/DataSource.java b/processing/src/main/java/org/apache/druid/query/DataSource.java index c376a553bb87..d12a8eeabab8 100644 --- a/processing/src/main/java/org/apache/druid/query/DataSource.java +++ b/processing/src/main/java/org/apache/druid/query/DataSource.java @@ -72,7 +72,7 @@ public interface DataSource * Returns true if all servers have a full copy of this datasource. True for things like inline, lookup, etc, or * for queries of those. * - * Currently this is couple with joinability - if this returns true then the query engine expects there exists a + * Currently this is coupled with joinability - if this returns true then the query engine expects there exists a * {@link org.apache.druid.segment.join.JoinableFactory} which might build a * {@link org.apache.druid.segment.join.Joinable} for this datasource directly. If a subquery 'inline' join is * required to join this datasource on the right hand side, then this value must be false for now. diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 04c2e6637b74..adb3626c54af 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -556,7 +556,7 @@ public void testLocalSegmentCacheSetsDataSourceAsGlobalAndJoinable() throws Inte } @Test - public void testLocalSegmentCacheSetsDataSourceAsGlobalButNotJoinable() throws InterruptedException + public void testLocalSegmentCacheSetsDataSourceAsBroadcastButNotJoinable() throws InterruptedException { DruidTable fooTable = (DruidTable) schema.getTableMap().get("foo"); Assert.assertNotNull(fooTable); From 33c65a2339dacd49dfc88930877a06df3312ab2c Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 18 Jun 2020 16:15:10 -0700 Subject: [PATCH 7/7] fix mistake --- .../java/org/apache/druid/server/ClientQuerySegmentWalker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 5f284f720c6a..c7318a60f7f6 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -203,7 +203,7 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable( - decorateClusterRunner(freeTradeQuery, clusterClient.getQueryRunnerForSegments(query, specs)), + decorateClusterRunner(freeTradeQuery, clusterClient.getQueryRunnerForSegments(freeTradeQuery, specs)), query, freeTradeQuery );