diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 23dc65f5235c..64c0e07b2ff3 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.inject.Injector; import com.google.inject.Module; @@ -379,7 +380,7 @@ public void emit(Event event) baseClient, null /* local client; unused in this test, so pass in null */, warehouse, - new MapJoinableFactory(ImmutableMap.of()), + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), retryConfig, jsonMapper, serverConfig, diff --git a/integration-tests/docker/environment-configs/broker b/integration-tests/docker/environment-configs/broker index ae2d5611f241..6f564b917760 100644 --- a/integration-tests/docker/environment-configs/broker +++ b/integration-tests/docker/environment-configs/broker @@ -38,4 +38,7 @@ druid_auth_basic_common_cacheDirectory=/tmp/authCache/broker druid_sql_avatica_enable=true druid_server_https_crlPath=/tls/revocations.crl druid_query_scheduler_laning_strategy=manual -druid_query_scheduler_laning_lanes_one=1 \ No newline at end of file +druid_query_scheduler_laning_lanes_one=1 +druid_segmentCache_locations=[{"path":"/shared/druid/brokerIndexCache","maxSize":1000000000}] +druid_server_maxSize=1000000000 +druid_sql_planner_metadataRefreshPeriod=PT15S \ No newline at end of file diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index c0e04b9ab21b..1b96f3fd04e8 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -33,6 +33,7 @@ import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.query.lookup.LookupsState; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; +import org.apache.druid.server.coordinator.rules.Rule; import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.TestClient; @@ -376,6 +377,26 @@ public void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig) } } + public void postLoadRules(String datasourceName, List rules) throws Exception + { + String url = StringUtils.format("%srules/%s", getCoordinatorURL(), datasourceName); + StatusResponseHolder response = httpClient.go( + new Request(HttpMethod.POST, new URL(url)).setContent( + "application/json", + jsonMapper.writeValueAsBytes(rules) + ), responseHandler + ).get(); + + if (!response.getStatus().equals(HttpResponseStatus.OK)) { + throw new ISE( + "Error while setting dynamic config[%s] status[%s] content[%s]", + url, + response.getStatus(), + response.getContent() + ); + } + } + public CoordinatorDynamicConfig getDynamicConfig() { String url = StringUtils.format("%sconfig", getCoordinatorURL()); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java new file mode 100644 index 000000000000..b7e2910a98d3 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITBroadcastJoinQueryTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.query; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import org.apache.druid.curator.discovery.ServerDiscoveryFactory; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.guice.TestClient; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.SqlTestQueryHelper; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractIndexerTest; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +@Test(groups = TestNGGroup.QUERY) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITBroadcastJoinQueryTest extends AbstractIndexerTest +{ + private static final Logger LOG = new Logger(ITBroadcastJoinQueryTest.class); + private static final String BROADCAST_JOIN_TASK = "/indexer/broadcast_join_index_task.json"; + private static final String BROADCAST_JOIN_METADATA_QUERIES_RESOURCE = "/queries/broadcast_join_metadata_queries.json"; + private static final String BROADCAST_JOIN_QUERIES_RESOURCE = "/queries/broadcast_join_queries.json"; + private static final String BROADCAST_JOIN_DATASOURCE = "broadcast_join_wikipedia_test"; + + + @Inject + ServerDiscoveryFactory factory; + + @Inject + CoordinatorResourceTestClient coordinatorClient; + + @Inject + SqlTestQueryHelper queryHelper; + + @Inject + @TestClient + HttpClient httpClient; + + @Inject + IntegrationTestingConfig config; + + @Test + public void testBroadcastJoin() throws Exception + { + final Closer closer = Closer.create(); + try { + closer.register(unloader(BROADCAST_JOIN_DATASOURCE)); + + // prepare for broadcast + coordinatorClient.postLoadRules( + BROADCAST_JOIN_DATASOURCE, + ImmutableList.of(new ForeverBroadcastDistributionRule()) + ); + + // load the data + String taskJson = replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_TASK), BROADCAST_JOIN_DATASOURCE); + String taskId = indexer.submitTask(taskJson); + + ITRetryUtil.retryUntilTrue( + () -> coordinatorClient.areSegmentsLoaded(BROADCAST_JOIN_DATASOURCE), "broadcast segment load" + ); + + // query metadata until druid schema is refreshed and datasource is available joinable + ITRetryUtil.retryUntilTrue( + () -> { + try { + queryHelper.testQueriesFromString( + queryHelper.getQueryURL(config.getRouterUrl()), + replaceJoinTemplate( + getResourceAsString(BROADCAST_JOIN_METADATA_QUERIES_RESOURCE), + BROADCAST_JOIN_DATASOURCE + ), + 1 + ); + return true; + } + catch (Exception ex) { + return false; + } + }, + "waiting for SQL metadata refresh" + ); + + // now do some queries + queryHelper.testQueriesFromString( + queryHelper.getQueryURL(config.getRouterUrl()), + replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_QUERIES_RESOURCE), BROADCAST_JOIN_DATASOURCE), + 1 + ); + } + finally { + closer.close(); + } + } + + private static String replaceJoinTemplate(String template, String joinDataSource) + { + return StringUtils.replace( + StringUtils.replace(template, "%%JOIN_DATASOURCE%%", joinDataSource), + "%%REGULAR_DATASOURCE%%", + ITWikipediaQueryTest.WIKIPEDIA_DATA_SOURCE + ); + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java index 858b72d38bc4..a80abbb6b395 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java @@ -47,7 +47,7 @@ @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITWikipediaQueryTest { - private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream"; + public static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream"; private static final String WIKI_LOOKUP = "wiki-simple"; private static final String WIKIPEDIA_QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries.json"; private static final String WIKIPEDIA_LOOKUP_RESOURCE = "/queries/wiki-lookup-config.json"; diff --git a/integration-tests/src/test/resources/indexer/broadcast_join_index_task.json b/integration-tests/src/test/resources/indexer/broadcast_join_index_task.json new file mode 100644 index 000000000000..20c3b162ea6e --- /dev/null +++ b/integration-tests/src/test/resources/indexer/broadcast_join_index_task.json @@ -0,0 +1,82 @@ +{ + "type": "index_parallel", + "spec": { + "dataSchema": { + "dataSource": "%%JOIN_DATASOURCE%%", + "timestampSpec": { + "column": "timestamp", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city", + { + "type": "long", + "name": "added" + }, + { + "type": "long", + "name": "deleted" + } + ] + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "sum_added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "sum_deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ], + "granularitySpec": { + "segmentGranularity": "YEAR", + "queryGranularity": "second" + } + }, + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "local", + "baseDir": "/resources/data/union_query/", + "filter": "wikipedia_index_data*" + }, + "appendToExisting": false, + "inputFormat": { + "type": "json" + } + }, + "tuningConfig": { + "type": "index_parallel", + "indexSpec": { + "segmentLoader": { + "type": "broadcastJoinableMMapSegmentFactory", + "keyColumns": ["user", "language", "added", "deleted"] + } + } + } + } +} \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json b/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json index 53e063a9c8c1..9819ef212290 100644 --- a/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json +++ b/integration-tests/src/test/resources/indexer/sys_segment_batch_index_queries.json @@ -16,6 +16,9 @@ "expectedResults": [ { "server_type":"historical" + }, + { + "server_type":"broker" } ] }, diff --git a/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json b/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json new file mode 100644 index 000000000000..c2c32b31832a --- /dev/null +++ b/integration-tests/src/test/resources/queries/broadcast_join_metadata_queries.json @@ -0,0 +1,26 @@ +[ + { + "description": "query information schema to make sure datasource is joinable and broadcast", + "query": { + "query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'" + }, + "expectedResults": [ + { + "TABLE_NAME": "%%JOIN_DATASOURCE%%", + "IS_JOINABLE": "YES", + "IS_BROADCAST": "YES" + } + ] + }, + { + "description": "query information schema to make sure druid schema is refreshed", + "query": { + "query": "SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '%%JOIN_DATASOURCE%%'" + }, + "expectedResults": [ + { + "EXPR$0": 19 + } + ] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/queries/broadcast_join_queries.json b/integration-tests/src/test/resources/queries/broadcast_join_queries.json new file mode 100644 index 000000000000..ee46b415ad9c --- /dev/null +++ b/integration-tests/src/test/resources/queries/broadcast_join_queries.json @@ -0,0 +1,29 @@ +[ + { + "description": "query broadcast join segment directly", + "query": { + "query": "SELECT \"%%JOIN_DATASOURCE%%\".\"user\", SUM(\"%%JOIN_DATASOURCE%%\".\"added\") FROM druid.\"%%JOIN_DATASOURCE%%\" GROUP BY 1 ORDER BY 2", + "resultFormat": "OBJECT" + }, + "expectedResults": [ + {"user":"stringer","EXPR$1":2}, + {"user":"nuclear","EXPR$1":114}, + {"user":"masterYi","EXPR$1":246}, + {"user":"speed","EXPR$1":918}, + {"user":"triplets","EXPR$1":1810} + ] + }, + { + "description": "regular datasource is lhs, broadcast join datasource is rhs", + "query": { + "query": "SELECT \"%%JOIN_DATASOURCE%%\".\"language\" as l1, \"%%REGULAR_DATASOURCE%%\".\"language\" as l2, SUM(\"%%JOIN_DATASOURCE%%\".\"sum_added\"), SUM(\"%%REGULAR_DATASOURCE%%\".\"added\") FROM druid.\"%%REGULAR_DATASOURCE%%\" INNER JOIN druid.\"%%JOIN_DATASOURCE%%\" ON \"%%REGULAR_DATASOURCE%%\".\"language\" = \"%%JOIN_DATASOURCE%%\".\"language\" GROUP BY 1, 2 ORDER BY 3 DESC", + "resultFormat": "OBJECT" + }, + "expectedResults": [ + {"l1":"en","l2":"en","EXPR$2":1.372562064E9,"EXPR$3":2.191945776E9}, + {"l1":"zh","l2":"zh","EXPR$2":2.0833281E8,"EXPR$3":9.6017292E7}, + {"l1":"ru","l2":"ru","EXPR$2":6.6673872E7,"EXPR$3":2.19902506E8}, + {"l1":"ja","l2":"ja","EXPR$2":249728.0,"EXPR$3":8.3520802E7} + ] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/queries/sys_segment_queries.json b/integration-tests/src/test/resources/queries/sys_segment_queries.json index 48c702a369e9..3eef61f5a406 100644 --- a/integration-tests/src/test/resources/queries/sys_segment_queries.json +++ b/integration-tests/src/test/resources/queries/sys_segment_queries.json @@ -21,6 +21,9 @@ "expectedResults": [ { "server_type":"historical" + }, + { + "server_type":"broker" } ] } diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_servers.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_servers.json index bf7c681af6e6..05091318b585 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_servers.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_servers.json @@ -8,5 +8,15 @@ "tier": "_default_tier", "curr_size": 2208932412, "max_size": 5000000000 + }, + { + "server": "172.172.172.8:8282", + "host": "172.172.172.8", + "plaintext_port": 8082, + "tls_port": 8282, + "server_type": "broker", + "tier": "_default_tier", + "curr_size": 0, + "max_size": 1000000000 } ] diff --git a/processing/src/main/java/org/apache/druid/jackson/SegmentizerModule.java b/processing/src/main/java/org/apache/druid/jackson/SegmentizerModule.java index 82e96ccb87c5..b0cd6cbb8475 100644 --- a/processing/src/main/java/org/apache/druid/jackson/SegmentizerModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/SegmentizerModule.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.druid.segment.loading.BroadcastJoinableMMappedQueryableSegmentizerFactory; import org.apache.druid.segment.loading.MMappedQueryableSegmentizerFactory; public class SegmentizerModule extends SimpleModule @@ -29,5 +30,8 @@ public SegmentizerModule() { super("SegmentizerModule"); registerSubtypes(new NamedType(MMappedQueryableSegmentizerFactory.class, "mMapSegmentFactory")); + registerSubtypes( + new NamedType(BroadcastJoinableMMappedQueryableSegmentizerFactory.class, "broadcastJoinableMMapSegmentFactory") + ); } } diff --git a/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java index 49175a06873b..82f8151129d9 100644 --- a/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/QueryableIndexColumnSelectorFactory.java @@ -39,7 +39,7 @@ * It's counterpart for incremental index is {@link * org.apache.druid.segment.incremental.IncrementalIndexColumnSelectorFactory}. */ -class QueryableIndexColumnSelectorFactory implements ColumnSelectorFactory +public class QueryableIndexColumnSelectorFactory implements ColumnSelectorFactory { private final QueryableIndex index; private final VirtualColumns virtualColumns; @@ -55,7 +55,7 @@ class QueryableIndexColumnSelectorFactory implements ColumnSelectorFactory private final Map dimensionSelectorCache; private final Map valueSelectorCache; - QueryableIndexColumnSelectorFactory( + public QueryableIndexColumnSelectorFactory( QueryableIndex index, VirtualColumns virtualColumns, boolean descending, diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleAscendingOffset.java b/processing/src/main/java/org/apache/druid/segment/SimpleAscendingOffset.java index e730e96b368e..b4c6111804eb 100644 --- a/processing/src/main/java/org/apache/druid/segment/SimpleAscendingOffset.java +++ b/processing/src/main/java/org/apache/druid/segment/SimpleAscendingOffset.java @@ -23,7 +23,7 @@ import org.apache.druid.segment.data.Offset; import org.apache.druid.segment.data.ReadableOffset; -public class SimpleAscendingOffset extends Offset +public class SimpleAscendingOffset extends SimpleSettableOffset { private final int rowCount; private final int initialOffset; @@ -53,7 +53,8 @@ public boolean withinBounds() return currentOffset < rowCount; } - void setCurrentOffset(int currentOffset) + @Override + public void setCurrentOffset(int currentOffset) { this.currentOffset = currentOffset; } diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleDescendingOffset.java b/processing/src/main/java/org/apache/druid/segment/SimpleDescendingOffset.java index f19e36cc6821..fe8aba36fb99 100644 --- a/processing/src/main/java/org/apache/druid/segment/SimpleDescendingOffset.java +++ b/processing/src/main/java/org/apache/druid/segment/SimpleDescendingOffset.java @@ -23,13 +23,13 @@ import org.apache.druid.segment.data.Offset; import org.apache.druid.segment.data.ReadableOffset; -public class SimpleDescendingOffset extends Offset +public class SimpleDescendingOffset extends SimpleSettableOffset { private final int rowCount; private final int initialOffset; private int currentOffset; - SimpleDescendingOffset(int rowCount) + public SimpleDescendingOffset(int rowCount) { this(rowCount - 1, rowCount); } @@ -77,6 +77,12 @@ public int getOffset() return currentOffset; } + @Override + public void setCurrentOffset(int currentOffset) + { + this.currentOffset = currentOffset; + } + @Override public String toString() { diff --git a/processing/src/main/java/org/apache/druid/segment/SimpleSettableOffset.java b/processing/src/main/java/org/apache/druid/segment/SimpleSettableOffset.java new file mode 100644 index 000000000000..68afcfe6b59f --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/SimpleSettableOffset.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import org.apache.druid.segment.data.Offset; + +public abstract class SimpleSettableOffset extends Offset +{ + public abstract void setCurrentOffset(int currentOffset); +} diff --git a/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java b/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java index 767f45fdc138..bb708b58368b 100644 --- a/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java +++ b/processing/src/main/java/org/apache/druid/segment/column/RowSignature.java @@ -143,6 +143,11 @@ public boolean contains(final String columnName) return columnPositions.containsKey(columnName); } + public boolean contains(final int columnNumber) + { + return 0 <= columnNumber && columnNumber < columnNames.size(); + } + /** * Returns the first position of {@code columnName} in this row signature, or -1 if it does not appear. * diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinEngine.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinEngine.java index 32f811596a89..40582eee77af 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinEngine.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinEngine.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.join; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.BaseQuery; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.segment.ColumnSelectorFactory; @@ -51,14 +52,21 @@ private HashJoinEngine() * not be queryable through the returned Cursor. This happens even if the right-hand joinable doesn't actually have a * column with this name. */ - public static Cursor makeJoinCursor(final Cursor leftCursor, final JoinableClause joinableClause) + public static Cursor makeJoinCursor( + final Cursor leftCursor, + final JoinableClause joinableClause, + final boolean descending, + final Closer closer + ) { final ColumnSelectorFactory leftColumnSelectorFactory = leftCursor.getColumnSelectorFactory(); final JoinMatcher joinMatcher = joinableClause.getJoinable() .makeJoinMatcher( leftColumnSelectorFactory, joinableClause.getCondition(), - joinableClause.getJoinType().isRighty() + joinableClause.getJoinType().isRighty(), + descending, + closer ); class JoinColumnSelectorFactory implements ColumnSelectorFactory diff --git a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java index b5bef656773b..03f3f946d461 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentStorageAdapter.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.filter.Filter; import org.apache.druid.segment.Cursor; @@ -254,14 +255,15 @@ public Sequence makeCursors( queryMetrics ); - return Sequences.map( + Closer joinablesCloser = Closer.create(); + return Sequences.map( baseCursorSequence, cursor -> { assert cursor != null; Cursor retVal = cursor; for (JoinableClause clause : clauses) { - retVal = HashJoinEngine.makeJoinCursor(retVal, clause); + retVal = HashJoinEngine.makeJoinCursor(retVal, clause, descending, joinablesCloser); } return PostJoinCursor.wrap( @@ -270,7 +272,7 @@ public Sequence makeCursors( joinFilterSplit.getJoinTableFilter().isPresent() ? joinFilterSplit.getJoinTableFilter().get() : null ); } - ); + ).withBaggage(joinablesCloser); } /** diff --git a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java index 7ad7799a1099..f22134bc0c28 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/Joinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/Joinable.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.join; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ReferenceCountedObject; import org.apache.druid.segment.column.ColumnCapabilities; @@ -71,12 +72,17 @@ public interface Joinable extends ReferenceCountedObject * @param remainderNeeded whether or not {@link JoinMatcher#matchRemainder()} will ever be called on the * matcher. If we know it will not, additional optimizations are often possible. * + * @param descending true if join cursor is iterated in descending order + * @param closer closer that will run after join cursor has completed to clean up any per query + * resources the joinable uses * @return the matcher */ JoinMatcher makeJoinMatcher( ColumnSelectorFactory leftColumnSelectorFactory, JoinConditionAnalysis condition, - boolean remainderNeeded + boolean remainderNeeded, + boolean descending, + Closer closer ); /** diff --git a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java index 723aba57faa9..3912ebfa98b3 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/JoinableFactory.java @@ -26,7 +26,8 @@ /** * Utility for creating {@link Joinable} objects. * - * @see org.apache.druid.guice.DruidBinders#joinableFactoryBinder to register factories + * @see org.apache.druid.guice.DruidBinders#joinableFactoryMultiBinder to register factories + * @see org.apache.druid.guice.DruidBinders#joinableMappingBinder to register factory types with datasource types */ public interface JoinableFactory { diff --git a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java index abf4b6ae4d06..904433f8b4c5 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/MapJoinableFactory.java @@ -19,49 +19,65 @@ package org.apache.druid.segment.join; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.SetMultimap; import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.query.DataSource; -import java.util.IdentityHashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; /** - * A {@link JoinableFactory} that delegates to the appropriate factory based on the type of the datasource. + * A {@link JoinableFactory} that delegates to the appropriate factory based on the datasource. * - * Datasources can register a factory via a DruidBinder + * Any number of {@link JoinableFactory} may be associated to the same class of {@link DataSource}, but for a specific + * datasource only a single {@link JoinableFactory} should be able to create a {@link Joinable} in the {@link #build} + * method. */ public class MapJoinableFactory implements JoinableFactory { - private final Map, JoinableFactory> joinableFactories; + private final SetMultimap, JoinableFactory> joinableFactories; @Inject - public MapJoinableFactory(Map, JoinableFactory> joinableFactories) + public MapJoinableFactory( + Set factories, + Map, Class> factoryToDataSource + ) { - // Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap. - // Class doesn't override Object.equals(). - this.joinableFactories = new IdentityHashMap<>(joinableFactories); + this.joinableFactories = HashMultimap.create(); + factories.forEach(joinableFactory -> { + joinableFactories.put(factoryToDataSource.get(joinableFactory.getClass()), joinableFactory); + }); } @Override public boolean isDirectlyJoinable(DataSource dataSource) { - JoinableFactory factory = joinableFactories.get(dataSource.getClass()); - if (factory == null) { - return false; - } else { - return factory.isDirectlyJoinable(dataSource); + Set factories = joinableFactories.get(dataSource.getClass()); + for (JoinableFactory factory : factories) { + if (factory.isDirectlyJoinable(dataSource)) { + return true; + } } + return false; } @Override public Optional build(DataSource dataSource, JoinConditionAnalysis condition) { - JoinableFactory factory = joinableFactories.get(dataSource.getClass()); - if (factory == null) { - return Optional.empty(); - } else { - return factory.build(dataSource, condition); + Set factories = joinableFactories.get(dataSource.getClass()); + Optional maybeJoinable = Optional.empty(); + for (JoinableFactory factory : factories) { + Optional candidate = factory.build(dataSource, condition); + if (candidate.isPresent()) { + if (maybeJoinable.isPresent()) { + throw new ISE("Multiple joinable factories are valid for table[%s]", dataSource); + } + maybeJoinable = candidate; + } } + return maybeJoinable; } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java index 9321a184ebbc..109da85ab460 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/lookup/LookupJoinable.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.lookup.LookupExtractor; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnCapabilities; @@ -83,7 +84,9 @@ public ColumnCapabilities getColumnCapabilities(String columnName) public JoinMatcher makeJoinMatcher( final ColumnSelectorFactory leftSelectorFactory, final JoinConditionAnalysis condition, - final boolean remainderNeeded + final boolean remainderNeeded, + boolean descending, + Closer closer ) { return LookupJoinMatcher.create(extractor, leftSelectorFactory, condition, remainderNeeded); diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java new file mode 100644 index 000000000000..113bf3d5a212 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTable.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.table; + +import com.google.common.base.Preconditions; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.NilColumnValueSelector; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexColumnSelectorFactory; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.SimpleAscendingOffset; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.BaseColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.data.ReadableOffset; +import org.apache.druid.segment.filter.Filters; +import org.joda.time.chrono.ISOChronology; + +import javax.annotation.Nullable; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class BroadcastSegmentIndexedTable implements IndexedTable +{ + private static final Logger LOG = new Logger(BroadcastSegmentIndexedTable.class); + + private final QueryableIndexSegment segment; + private final QueryableIndexStorageAdapter adapter; + private final QueryableIndex queryableIndex; + private final Set keyColumns; + private final RowSignature rowSignature; + private final String version; + private final List> keyColumnsIndex; + + public BroadcastSegmentIndexedTable(final QueryableIndexSegment theSegment, final Set keyColumns, final String version) + { + this.keyColumns = keyColumns; + this.version = version; + this.segment = Preconditions.checkNotNull(theSegment, "Segment must not be null"); + this.adapter = Preconditions.checkNotNull( + (QueryableIndexStorageAdapter) segment.asStorageAdapter(), + "Segment[%s] must have a QueryableIndexStorageAdapter", + segment.getId() + ); + this.queryableIndex = Preconditions.checkNotNull( + segment.asQueryableIndex(), + "Segment[%s] must have a QueryableIndexSegment", + segment.getId() + ); + + RowSignature.Builder sigBuilder = RowSignature.builder(); + sigBuilder.add(ColumnHolder.TIME_COLUMN_NAME, ValueType.LONG); + for (String column : queryableIndex.getColumnNames()) { + sigBuilder.add(column, adapter.getColumnCapabilities(column).getType()); + } + this.rowSignature = sigBuilder.build(); + + // initialize keycolumn index maps + this.keyColumnsIndex = new ArrayList<>(rowSignature.size()); + final List keyColumnNames = new ArrayList<>(keyColumns.size()); + for (int i = 0; i < rowSignature.size(); i++) { + final Map m; + final String columnName = rowSignature.getColumnName(i); + if (keyColumns.contains(columnName)) { + m = new HashMap<>(); + keyColumnNames.add(columnName); + } else { + m = null; + } + keyColumnsIndex.add(m); + } + + // sort of like the dump segment tool, but build key column indexes when reading the segment + final Sequence cursors = adapter.makeCursors( + Filters.toFilter(null), + queryableIndex.getDataInterval().withChronology(ISOChronology.getInstanceUTC()), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + final Sequence sequence = Sequences.map( + cursors, + cursor -> { + if (cursor == null) { + return 0; + } + int rowNumber = 0; + ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); + + // this should really be optimized to use dimension selectors where possible to populate indexes from bitmap + // indexes, but, an optimization for another day + final List selectors = keyColumnNames + .stream() + .map(columnName -> { + // multi-value dimensions are not currently supported + if (adapter.getColumnCapabilities(columnName).hasMultipleValues().isMaybeTrue()) { + return NilColumnValueSelector.instance(); + } + return columnSelectorFactory.makeColumnValueSelector(columnName); + }) + .collect(Collectors.toList()); + + while (!cursor.isDone()) { + for (int keyColumnSelectorIndex = 0; keyColumnSelectorIndex < selectors.size(); keyColumnSelectorIndex++) { + final String keyColumnName = keyColumnNames.get(keyColumnSelectorIndex); + final int columnPosition = rowSignature.indexOf(keyColumnName); + final Map keyColumnValueIndex = keyColumnsIndex.get(columnPosition); + final Object key = selectors.get(keyColumnSelectorIndex).getObject(); + if (key != null) { + final IntList array = keyColumnValueIndex.computeIfAbsent(key, k -> new IntArrayList()); + array.add(rowNumber); + } + } + + if (rowNumber % 100_000 == 0) { + if (rowNumber == 0) { + LOG.debug("Indexed first row for table %s", theSegment.getId()); + } else { + LOG.debug("Indexed row %s for table %s", rowNumber, theSegment.getId()); + } + } + rowNumber++; + cursor.advance(); + } + return rowNumber; + } + ); + + Integer totalRows = sequence.accumulate(0, (accumulated, in) -> accumulated += in); + LOG.info("Created BroadcastSegmentIndexedTable with %s rows.", totalRows); + } + + @Override + public String version() + { + return version; + } + + @Override + public Set keyColumns() + { + return keyColumns; + } + + @Override + public RowSignature rowSignature() + { + return rowSignature; + } + + @Override + public int numRows() + { + return adapter.getNumRows(); + } + + @Override + public Index columnIndex(int column) + { + return RowBasedIndexedTable.getKeyColumnIndex(column, keyColumnsIndex, rowSignature); + } + + @Override + public Reader columnReader(int column) + { + if (!rowSignature.contains(column)) { + throw new IAE("Column[%d] is not a valid column for segment[%s]", column, segment.getId()); + } + final SimpleAscendingOffset offset = new SimpleAscendingOffset(adapter.getNumRows()); + final BaseColumn baseColumn = queryableIndex.getColumnHolder(rowSignature.getColumnName(column)).getColumn(); + final BaseObjectColumnValueSelector selector = baseColumn.makeColumnValueSelector(offset); + + return new Reader() + { + @Nullable + @Override + public Object read(int row) + { + offset.setCurrentOffset(row); + return selector.getObject(); + } + + @Override + public void close() throws IOException + { + baseColumn.close(); + } + }; + } + + @Nullable + @Override + public ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, boolean descending, Closer closer) + { + return new QueryableIndexColumnSelectorFactory( + queryableIndex, + VirtualColumns.EMPTY, + descending, + closer, + offset, + new HashMap<>() + ); + } + + @Override + public void close() + { + // the segment will close itself when it is dropped, no need to do it here + } + + @Override + public Optional acquireReferences() + { + return Optional.empty(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java index b47214487299..62c767739ffc 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTable.java @@ -20,11 +20,15 @@ package org.apache.druid.segment.join.table; import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ReferenceCountedObject; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.data.ReadableOffset; import javax.annotation.Nullable; import java.io.Closeable; +import java.io.IOException; import java.util.Set; /** @@ -65,10 +69,24 @@ public interface IndexedTable extends ReferenceCountedObject, Closeable /** * Returns a reader for a particular column. The provided column number must be that column's position in - * {@link #rowSignature()}. + * {@link #rowSignature()}. Don't forget to close your {@link Reader} when finished reading, to clean up any + * resources. */ Reader columnReader(int column); + /** + * This method allows a table to directly provide an optimized {@link ColumnSelectorFactory} for + * {@link IndexedTableJoinMatcher} to create selectors. If this method returns null, the default + * {@link IndexedTableColumnSelectorFactory}, which creates {@link IndexedTableDimensionSelector} or + * {@link IndexedTableColumnValueSelector} as appropriate, both backed with a {@link #columnReader}, will be used + * instead. + */ + @Nullable + default ColumnSelectorFactory makeColumnSelectorFactory(ReadableOffset offset, boolean descending, Closer closer) + { + return null; + } + /** * Indexes support fast lookups on key columns. */ @@ -83,7 +101,7 @@ interface Index /** * Readers support reading values out of any column. */ - interface Reader + interface Reader extends Closeable { /** * Read the value at a particular row number. Throws an exception if the row is out of bounds (must be between zero @@ -91,5 +109,11 @@ interface Reader */ @Nullable Object read(int row); + + @Override + default void close() throws IOException + { + // nothing to close + } } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnSelectorFactory.java index 00cb51ffffff..26fe2a9c1c5e 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnSelectorFactory.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnSelectorFactory.java @@ -19,6 +19,7 @@ package org.apache.druid.segment.join.table; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.dimension.DimensionSpec; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.ColumnValueSelector; @@ -36,11 +37,13 @@ public class IndexedTableColumnSelectorFactory implements ColumnSelectorFactory { private final IndexedTable table; private final IntSupplier currentRow; + private final Closer closer; - IndexedTableColumnSelectorFactory(IndexedTable table, IntSupplier currentRow) + IndexedTableColumnSelectorFactory(IndexedTable table, IntSupplier currentRow, Closer closer) { this.table = table; this.currentRow = currentRow; + this.closer = closer; } @Nullable @@ -79,7 +82,8 @@ public DimensionSelector makeDimensionSelector(final DimensionSpec dimensionSpec table, currentRow, columnNumber, - dimensionSpec.getExtractionFn() + dimensionSpec.getExtractionFn(), + closer ); return dimensionSpec.decorate(undecoratedSelector); @@ -95,7 +99,7 @@ public ColumnValueSelector makeColumnValueSelector(final String columnName) if (columnNumber < 0) { return NilColumnValueSelector.instance(); } else { - return new IndexedTableColumnValueSelector(table, currentRow, columnNumber); + return new IndexedTableColumnValueSelector(table, currentRow, columnNumber, closer); } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnValueSelector.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnValueSelector.java index 5f6b07adbfdc..e59b412a6247 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnValueSelector.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableColumnValueSelector.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.join.table; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; import org.apache.druid.segment.ColumnValueSelector; @@ -31,10 +32,11 @@ public class IndexedTableColumnValueSelector implements ColumnValueSelector IntIterators.fromTo(0, table.numRows())); @@ -106,7 +118,10 @@ public class IndexedTableJoinMatcher implements JoinMatcher } this.currentMatchedRows = new IntIterator[conditionMatchers.size()]; - this.selectorFactory = new IndexedTableColumnSelectorFactory(table, () -> currentRow); + ColumnSelectorFactory selectorFactory = table.makeColumnSelectorFactory(joinableOffset, descending, closer); + this.selectorFactory = selectorFactory != null + ? selectorFactory + : new IndexedTableColumnSelectorFactory(table, () -> currentRow, closer); if (remainderNeeded) { this.matchedRows = new IntRBTreeSet(); @@ -243,6 +258,7 @@ public void reset() currentIterator = null; currentRow = UNINITIALIZED_CURRENT_ROW; matchingRemainder = false; + joinableOffset.reset(); } private void advanceCurrentRow() @@ -252,6 +268,7 @@ private void advanceCurrentRow() } else { currentIterator = null; currentRow = UNINITIALIZED_CURRENT_ROW; + joinableOffset.setCurrentOffset(currentRow); } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java index 47166793ed20..4faaf549cd0b 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/IndexedTableJoinable.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.join.table; import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.column.ColumnCapabilities; import org.apache.druid.segment.join.JoinConditionAnalysis; @@ -28,6 +29,7 @@ import javax.annotation.Nullable; import java.io.Closeable; +import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -71,14 +73,18 @@ public ColumnCapabilities getColumnCapabilities(String columnName) public JoinMatcher makeJoinMatcher( final ColumnSelectorFactory leftColumnSelectorFactory, final JoinConditionAnalysis condition, - final boolean remainderNeeded + final boolean remainderNeeded, + boolean descending, + Closer closer ) { return new IndexedTableJoinMatcher( table, leftColumnSelectorFactory, condition, - remainderNeeded + remainderNeeded, + descending, + closer ); } @@ -97,41 +103,48 @@ public Optional> getCorrelatedColumnValues( if (filterColumnPosition < 0 || correlatedColumnPosition < 0) { return Optional.empty(); } + try (final Closer closer = Closer.create()) { + Set correlatedValues = new HashSet<>(); + if (table.keyColumns().contains(searchColumnName)) { + IndexedTable.Index index = table.columnIndex(filterColumnPosition); + IndexedTable.Reader reader = table.columnReader(correlatedColumnPosition); + closer.register(reader); + IntList rowIndex = index.find(searchColumnValue); + for (int i = 0; i < rowIndex.size(); i++) { + int rowNum = rowIndex.getInt(i); + String correlatedDimVal = Objects.toString(reader.read(rowNum), null); + correlatedValues.add(correlatedDimVal); - Set correlatedValues = new HashSet<>(); - if (table.keyColumns().contains(searchColumnName)) { - IndexedTable.Index index = table.columnIndex(filterColumnPosition); - IndexedTable.Reader reader = table.columnReader(correlatedColumnPosition); - IntList rowIndex = index.find(searchColumnValue); - for (int i = 0; i < rowIndex.size(); i++) { - int rowNum = rowIndex.getInt(i); - String correlatedDimVal = Objects.toString(reader.read(rowNum), null); - correlatedValues.add(correlatedDimVal); - - if (correlatedValues.size() > maxCorrelationSetSize) { + if (correlatedValues.size() > maxCorrelationSetSize) { + return Optional.empty(); + } + } + return Optional.of(correlatedValues); + } else { + if (!allowNonKeyColumnSearch) { return Optional.empty(); } - } - return Optional.of(correlatedValues); - } else { - if (!allowNonKeyColumnSearch) { - return Optional.empty(); - } - IndexedTable.Reader dimNameReader = table.columnReader(filterColumnPosition); - IndexedTable.Reader correlatedColumnReader = table.columnReader(correlatedColumnPosition); - for (int i = 0; i < table.numRows(); i++) { - String dimVal = Objects.toString(dimNameReader.read(i), null); - if (searchColumnValue.equals(dimVal)) { - String correlatedDimVal = Objects.toString(correlatedColumnReader.read(i), null); - correlatedValues.add(correlatedDimVal); - if (correlatedValues.size() > maxCorrelationSetSize) { - return Optional.empty(); + IndexedTable.Reader dimNameReader = table.columnReader(filterColumnPosition); + IndexedTable.Reader correlatedColumnReader = table.columnReader(correlatedColumnPosition); + closer.register(dimNameReader); + closer.register(correlatedColumnReader); + for (int i = 0; i < table.numRows(); i++) { + String dimVal = Objects.toString(dimNameReader.read(i), null); + if (searchColumnValue.equals(dimVal)) { + String correlatedDimVal = Objects.toString(correlatedColumnReader.read(i), null); + correlatedValues.add(correlatedDimVal); + if (correlatedValues.size() > maxCorrelationSetSize) { + return Optional.empty(); + } } } - } - return Optional.of(correlatedValues); + return Optional.of(correlatedValues); + } + } + catch (IOException e) { + throw new RuntimeException(e); } } diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/ReferenceCountingIndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/ReferenceCountingIndexedTable.java new file mode 100644 index 000000000000..43e9ec0b39b1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/join/table/ReferenceCountingIndexedTable.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.table; + +import org.apache.druid.segment.ReferenceCountingCloseableObject; +import org.apache.druid.segment.column.RowSignature; + +import java.io.Closeable; +import java.util.Optional; +import java.util.Set; + +public class ReferenceCountingIndexedTable extends ReferenceCountingCloseableObject + implements IndexedTable +{ + public ReferenceCountingIndexedTable(IndexedTable indexedTable) + { + super(indexedTable); + } + + @Override + public String version() + { + return baseObject.version(); + } + + @Override + public Set keyColumns() + { + return baseObject.keyColumns(); + } + + @Override + public RowSignature rowSignature() + { + return baseObject.rowSignature(); + } + + @Override + public int numRows() + { + return baseObject.numRows(); + } + + @Override + public Index columnIndex(int column) + { + return baseObject.columnIndex(column); + } + + @Override + public Reader columnReader(int column) + { + return baseObject.columnReader(column); + } + + @Override + public Optional acquireReferences() + { + return incrementReferenceAndDecrementOnceCloseable(); + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java index c18ec6a44b7f..c570017f4013 100644 --- a/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java +++ b/processing/src/main/java/org/apache/druid/segment/join/table/RowBasedIndexedTable.java @@ -33,7 +33,6 @@ import java.io.Closeable; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -69,10 +68,6 @@ public RowBasedIndexedTable( this.keyColumns = keyColumns; this.version = version; - if (new HashSet<>(keyColumns).size() != keyColumns.size()) { - throw new ISE("keyColumns[%s] must not contain duplicates", keyColumns); - } - if (!ImmutableSet.copyOf(rowSignature.getColumnNames()).containsAll(keyColumns)) { throw new ISE( "keyColumns[%s] must all be contained in rowSignature[%s]", @@ -132,29 +127,7 @@ public RowSignature rowSignature() @Override public Index columnIndex(int column) { - final Map indexMap = index.get(column); - - if (indexMap == null) { - throw new IAE("Column[%d] is not a key column", column); - } - - final ValueType columnType = - rowSignature.getColumnType(column).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE); - - return key -> { - final Object convertedKey = DimensionHandlerUtils.convertObjectToType(key, columnType, false); - - if (convertedKey != null) { - final IntList found = indexMap.get(convertedKey); - if (found != null) { - return found; - } else { - return IntLists.EMPTY_LIST; - } - } else { - return IntLists.EMPTY_LIST; - } - }; + return getKeyColumnIndex(column, index, rowSignature); } @Override @@ -187,4 +160,31 @@ public void close() { // nothing to close } + + static Index getKeyColumnIndex(int column, List> keyColumnsIndex, RowSignature rowSignature) + { + final Map indexMap = keyColumnsIndex.get(column); + + if (indexMap == null) { + throw new IAE("Column[%d] is not a key column", column); + } + + final ValueType columnType = + rowSignature.getColumnType(column).orElse(IndexedTableJoinMatcher.DEFAULT_KEY_TYPE); + + return key -> { + final Object convertedKey = DimensionHandlerUtils.convertObjectToType(key, columnType, false); + + if (convertedKey != null) { + final IntList found = indexMap.get(convertedKey); + if (found != null) { + return found; + } else { + return IntLists.EMPTY_LIST; + } + } else { + return IntLists.EMPTY_LIST; + } + }; + } } diff --git a/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java b/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java new file mode 100644 index 000000000000..c675ea5ff7bb --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactory.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable; +import org.apache.druid.segment.join.table.IndexedTable; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.Objects; +import java.util.Set; + +public class BroadcastJoinableMMappedQueryableSegmentizerFactory implements SegmentizerFactory +{ + private final IndexIO indexIO; + private final Set keyColumns; + + @JsonCreator + public BroadcastJoinableMMappedQueryableSegmentizerFactory( + @JacksonInject IndexIO indexIO, + @JsonProperty("keyColumns") Set keyColumns + ) + { + this.indexIO = indexIO; + this.keyColumns = keyColumns; + } + + @JsonProperty + public Set getKeyColumns() + { + return keyColumns; + } + + @Override + public Segment factorize(DataSegment dataSegment, File parentDir, boolean lazy) throws SegmentLoadingException + { + try { + return new QueryableIndexSegment(indexIO.loadIndex(parentDir, lazy), dataSegment.getId()) { + @Nullable + @Override + public T as(Class clazz) + { + if (clazz.equals(IndexedTable.class)) { + return (T) new BroadcastSegmentIndexedTable(this, keyColumns, dataSegment.getVersion()); + } + return super.as(clazz); + } + }; + } + catch (IOException e) { + throw new SegmentLoadingException(e, "%s", e.getMessage()); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BroadcastJoinableMMappedQueryableSegmentizerFactory that = (BroadcastJoinableMMappedQueryableSegmentizerFactory) o; + return Objects.equals(keyColumns, that.keyColumns); + } + + @Override + public int hashCode() + { + return Objects.hash(keyColumns); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java index 1d00e711c190..0b67e882f828 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/MapJoinableFactoryTest.java @@ -20,7 +20,7 @@ package org.apache.druid.segment.join; import com.google.common.collect.ImmutableMap; -import org.apache.druid.query.DataSource; +import com.google.common.collect.ImmutableSet; import org.apache.druid.query.InlineDataSource; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -31,20 +31,11 @@ import org.junit.Test; import org.junit.runner.RunWith; -import java.util.Map; import java.util.Optional; @RunWith(EasyMockRunner.class) public class MapJoinableFactoryTest { - /** - * A utility to create a {@link MapJoinableFactory} to be used by tests. - */ - public static MapJoinableFactory fromMap(Map, JoinableFactory> map) - { - return new MapJoinableFactory(map); - } - @Mock private InlineDataSource inlineDataSource; @Mock(MockType.NICE) @@ -63,7 +54,9 @@ public void setUp() noopDataSource = new NoopDataSource(); target = new MapJoinableFactory( - ImmutableMap.of(NoopDataSource.class, noopJoinableFactory)); + ImmutableSet.of(noopJoinableFactory), + ImmutableMap.of(noopJoinableFactory.getClass(), NoopDataSource.class) + ); } diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java new file mode 100644 index 000000000000..6d46e4299de7 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/join/table/BroadcastSegmentIndexedTableTest.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join.table; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import it.unimi.dsi.fastutil.ints.IntList; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.jackson.SegmentizerModule; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.SimpleAscendingOffset; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.column.BaseColumn; +import org.apache.druid.segment.column.ColumnHolder; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.loading.MMappedQueryableSegmentizerFactory; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.segment.loading.SegmentizerFactory; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public class BroadcastSegmentIndexedTableTest extends InitializedNullHandlingTest +{ + private static final String STRING_COL_1 = "market"; + private static final String LONG_COL_1 = "longNumericNull"; + private static final String DOUBLE_COL_1 = "doubleNumericNull"; + private static final String FLOAT_COL_1 = "floatNumericNull"; + private static final String STRING_COL_2 = "partial_null_column"; + private static final String MULTI_VALUE_COLUMN = "placementish"; + private static final String DIM_NOT_EXISTS = "DIM_NOT_EXISTS"; + private static final String DATASOURCE = "DATASOURCE"; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private QueryableIndexSegment backingSegment; + private BroadcastSegmentIndexedTable broadcastTable; + private List columnNames; + private final Set keyColumns = ImmutableSet.builder() + .add(STRING_COL_1) + .add(STRING_COL_2) + .add(LONG_COL_1) + .add(DOUBLE_COL_1) + .add(FLOAT_COL_1) + .add(MULTI_VALUE_COLUMN) + .add(DIM_NOT_EXISTS) + .build(); + + @Before + public void setup() throws IOException, SegmentLoadingException + { + final ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerModule(new SegmentizerModule()); + final IndexIO indexIO = new IndexIO(mapper, () -> 0); + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ObjectMapper.class.getName(), mapper) + .addValue(IndexIO.class, indexIO) + .addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT) + ); + + final IndexMerger indexMerger = + new IndexMergerV9(mapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + Interval testInterval = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"); + IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv"); + File segment = new File(temporaryFolder.newFolder(), "segment"); + File persisted = indexMerger.persist( + data, + testInterval, + segment, + new IndexSpec(), + null + ); + File factoryJson = new File(persisted, "factory.json"); + Assert.assertTrue(factoryJson.exists()); + SegmentizerFactory factory = mapper.readValue(factoryJson, SegmentizerFactory.class); + Assert.assertTrue(factory instanceof MMappedQueryableSegmentizerFactory); + + DataSegment dataSegment = new DataSegment( + DATASOURCE, + testInterval, + DateTimes.nowUtc().toString(), + ImmutableMap.of(), + columnNames, + ImmutableList.of(), + null, + null, + segment.getTotalSpace() + ); + backingSegment = (QueryableIndexSegment) factory.factorize(dataSegment, segment, false); + + columnNames = ImmutableList.builder().add(ColumnHolder.TIME_COLUMN_NAME) + .addAll(backingSegment.asQueryableIndex().getColumnNames()).build(); + broadcastTable = new BroadcastSegmentIndexedTable(backingSegment, keyColumns, dataSegment.getVersion()); + } + + @Test + public void testInitShouldGenerateCorrectTable() + { + Assert.assertEquals(1209, broadcastTable.numRows()); + } + + @Test + public void testStringKeyColumn() + { + // lets try a few values out + final String[] vals = new String[] {"spot", "total_market", "upfront"}; + checkIndexAndReader(STRING_COL_1, vals); + } + + @Test + public void testNullableStringKeyColumn() + { + final String[] vals = new String[] {null, "value"}; + checkIndexAndReader(STRING_COL_2, vals); + } + + @Test + public void testMultiValueStringKeyColumn() + { + final Object[] nonMatchingVals = new Object[] {ImmutableList.of("a", "preferred")}; + checkIndexAndReader(MULTI_VALUE_COLUMN, new Object[0], nonMatchingVals); + } + + @Test + public void testLongKeyColumn() + { + final Long[] vals = new Long[] {NullHandling.replaceWithDefault() ? 0L : null, 10L, 20L}; + checkIndexAndReader(LONG_COL_1, vals); + } + + @Test + public void testFloatKeyColumn() + { + final Float[] vals = new Float[] {NullHandling.replaceWithDefault() ? 0.0f : null, 10.0f, 20.0f}; + checkIndexAndReader(FLOAT_COL_1, vals); + } + + @Test + public void testDoubleKeyColumn() + { + final Double[] vals = new Double[] {NullHandling.replaceWithDefault() ? 0.0 : null, 10.0, 20.0}; + checkIndexAndReader(DOUBLE_COL_1, vals); + } + + @Test + public void testTimestampColumn() + { + checkNonIndexedReader(ColumnHolder.TIME_COLUMN_NAME); + } + + @Test + public void testStringNonKeyColumn() + { + checkNonIndexedReader("qualityNumericString"); + } + + @Test + public void testLongNonKeyColumn() + { + checkNonIndexedReader("qualityLong"); + } + + @Test + public void testFloatNonKeyColumn() + { + checkNonIndexedReader("qualityFloat"); + } + + @Test + public void testDoubleNonKeyColumn() + { + checkNonIndexedReader("qualityDouble"); + } + + @Test + public void testNonexistentColumn() + { + expectedException.expect(IAE.class); + expectedException.expectMessage("Column[-1] is not a valid column"); + broadcastTable.columnReader(columnNames.indexOf(DIM_NOT_EXISTS)); + } + + @Test + public void testNonexistentColumnOutOfRange() + { + final int non = columnNames.size(); + expectedException.expect(IAE.class); + expectedException.expectMessage(StringUtils.format("Column[%s] is not a valid column", non)); + broadcastTable.columnReader(non); + } + + private void checkIndexAndReader(String columnName, Object[] vals) + { + checkIndexAndReader(columnName, vals, new Object[0]); + } + + private void checkIndexAndReader(String columnName, Object[] vals, Object[] nonmatchingVals) + { + checkColumnSelectorFactory(columnName); + try (final Closer closer = Closer.create()) { + final int columnIndex = columnNames.indexOf(columnName); + final IndexedTable.Reader reader = broadcastTable.columnReader(columnIndex); + closer.register(reader); + final IndexedTable.Index valueIndex = broadcastTable.columnIndex(columnIndex); + + // lets try a few values out + for (Object val : vals) { + final IntList valIndex = valueIndex.find(val); + if (val == null) { + Assert.assertEquals(0, valIndex.size()); + } else { + Assert.assertTrue(valIndex.size() > 0); + for (int i = 0; i < valIndex.size(); i++) { + Assert.assertEquals(val, reader.read(valIndex.getInt(i))); + } + } + } + for (Object val : nonmatchingVals) { + final IntList valIndex = valueIndex.find(val); + Assert.assertEquals(0, valIndex.size()); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void checkNonIndexedReader(String columnName) + { + checkColumnSelectorFactory(columnName); + try (final Closer closer = Closer.create()) { + final int columnIndex = columnNames.indexOf(columnName); + final int numRows = backingSegment.asStorageAdapter().getNumRows(); + final IndexedTable.Reader reader = broadcastTable.columnReader(columnIndex); + closer.register(reader); + final SimpleAscendingOffset offset = new SimpleAscendingOffset(numRows); + final BaseColumn theColumn = backingSegment.asQueryableIndex() + .getColumnHolder(columnName) + .getColumn(); + closer.register(theColumn); + final BaseObjectColumnValueSelector selector = theColumn.makeColumnValueSelector(offset); + // compare with selector make sure reader can read correct values + for (int row = 0; row < numRows; row++) { + offset.setCurrentOffset(row); + Assert.assertEquals(selector.getObject(), reader.read(row)); + } + // make sure it doesn't have an index since it isn't a key column + try { + Assert.assertEquals(null, broadcastTable.columnIndex(columnIndex)); + } + catch (IAE iae) { + Assert.assertEquals(StringUtils.format("Column[%d] is not a key column", columnIndex), iae.getMessage()); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void checkColumnSelectorFactory(String columnName) + { + try (final Closer closer = Closer.create()) { + final int numRows = backingSegment.asStorageAdapter().getNumRows(); + + final SimpleAscendingOffset offset = new SimpleAscendingOffset(numRows); + final BaseColumn theColumn = backingSegment.asQueryableIndex() + .getColumnHolder(columnName) + .getColumn(); + closer.register(theColumn); + final BaseObjectColumnValueSelector selector = theColumn.makeColumnValueSelector(offset); + + ColumnSelectorFactory tableFactory = broadcastTable.makeColumnSelectorFactory(offset, false, closer); + final BaseObjectColumnValueSelector tableSelector = tableFactory.makeColumnValueSelector(columnName); + + // compare with base segment selector to make sure tables selector can read correct values + for (int row = 0; row < numRows; row++) { + offset.setCurrentOffset(row); + Assert.assertEquals(selector.getObject(), tableSelector.getObject()); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java index 61b56377b5d9..5f54aa24e56c 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/IndexedTableJoinableTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.dimension.DefaultDimensionSpec; @@ -108,6 +109,7 @@ public void setUp() { target = new IndexedTableJoinable(indexedTable); } + @Test public void getAvailableColumns() { @@ -169,7 +171,13 @@ public void makeJoinMatcherWithDimensionSelectorOnString() PREFIX, ExprMacroTable.nil() ); - final JoinMatcher joinMatcher = target.makeJoinMatcher(dummyColumnSelectorFactory, condition, false); + final JoinMatcher joinMatcher = target.makeJoinMatcher( + dummyColumnSelectorFactory, + condition, + false, + false, + Closer.create() + ); final DimensionSelector selector = joinMatcher.getColumnSelectorFactory() .makeDimensionSelector(DefaultDimensionSpec.of("str")); diff --git a/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java b/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java new file mode 100644 index 000000000000..ab4c45339cdc --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/loading/BroadcastJoinableMMappedQueryableSegmentizerFactoryTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.jackson.SegmentizerModule; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.Segment; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.join.table.BroadcastSegmentIndexedTable; +import org.apache.druid.segment.join.table.IndexedTable; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public class BroadcastJoinableMMappedQueryableSegmentizerFactoryTest extends InitializedNullHandlingTest +{ + private static final String TABLE_NAME = "test"; + private static final Set KEY_COLUMNS = + ImmutableSet.of("market", "longNumericNull", "doubleNumericNull", "floatNumericNull", "partial_null_column"); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Test + public void testSegmentizer() throws IOException, SegmentLoadingException + { + final ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerModule(new SegmentizerModule()); + final IndexIO indexIO = new IndexIO(mapper, () -> 0); + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ObjectMapper.class.getName(), mapper) + .addValue(IndexIO.class, indexIO) + .addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT) + ); + + IndexMerger indexMerger = new IndexMergerV9(mapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + + SegmentizerFactory expectedFactory = new BroadcastJoinableMMappedQueryableSegmentizerFactory( + indexIO, + KEY_COLUMNS + ); + Interval testInterval = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"); + IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv"); + + List columnNames = data.getColumnNames(); + File segment = new File(temporaryFolder.newFolder(), "segment"); + File persistedSegmentRoot = indexMerger.persist( + data, + testInterval, + segment, + new IndexSpec( + null, + null, + null, + null, + expectedFactory + ), + null + ); + + File factoryJson = new File(persistedSegmentRoot, "factory.json"); + Assert.assertTrue(factoryJson.exists()); + SegmentizerFactory factory = mapper.readValue(factoryJson, SegmentizerFactory.class); + Assert.assertTrue(factory instanceof BroadcastJoinableMMappedQueryableSegmentizerFactory); + Assert.assertEquals(expectedFactory, factory); + + // load a segment + final DataSegment dataSegment = new DataSegment( + TABLE_NAME, + testInterval, + DateTimes.nowUtc().toString(), + ImmutableMap.of(), + columnNames, + ImmutableList.of(), + null, + null, + persistedSegmentRoot.getTotalSpace() + ); + final Segment loaded = factory.factorize(dataSegment, persistedSegmentRoot, false); + + final BroadcastSegmentIndexedTable table = (BroadcastSegmentIndexedTable) loaded.as(IndexedTable.class); + Assert.assertNotNull(table); + } +} diff --git a/server/src/main/java/org/apache/druid/guice/DruidBinders.java b/server/src/main/java/org/apache/druid/guice/DruidBinders.java index 258ee58be575..88f5e65fc0a9 100644 --- a/server/src/main/java/org/apache/druid/guice/DruidBinders.java +++ b/server/src/main/java/org/apache/druid/guice/DruidBinders.java @@ -71,12 +71,20 @@ public static MapBinder, SegmentWrangler> segmentWra ); } - public static MapBinder, JoinableFactory> joinableFactoryBinder(Binder binder) + public static Multibinder joinableFactoryMultiBinder(Binder binder) { - return MapBinder.newMapBinder( + return Multibinder.newSetBinder( binder, - new TypeLiteral>() {}, new TypeLiteral() {} ); } + + public static MapBinder, Class> joinableMappingBinder(Binder binder) + { + return MapBinder.newMapBinder( + binder, + new TypeLiteral>() {}, + new TypeLiteral>() {} + ); + } } diff --git a/server/src/main/java/org/apache/druid/guice/JoinableFactoryModule.java b/server/src/main/java/org/apache/druid/guice/JoinableFactoryModule.java index 73e5610d847e..e8f803dc7730 100644 --- a/server/src/main/java/org/apache/druid/guice/JoinableFactoryModule.java +++ b/server/src/main/java/org/apache/druid/guice/JoinableFactoryModule.java @@ -25,9 +25,12 @@ import com.google.inject.Module; import com.google.inject.Scopes; import com.google.inject.multibindings.MapBinder; +import com.google.inject.multibindings.Multibinder; import org.apache.druid.query.DataSource; +import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.LookupDataSource; +import org.apache.druid.segment.join.BroadcastTableJoinableFactory; import org.apache.druid.segment.join.InlineJoinableFactory; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.LookupJoinableFactory; @@ -47,17 +50,22 @@ public class JoinableFactoryModule implements Module static final Map, Class> FACTORY_MAPPINGS = ImmutableMap.of( InlineDataSource.class, InlineJoinableFactory.class, - LookupDataSource.class, LookupJoinableFactory.class + LookupDataSource.class, LookupJoinableFactory.class, + GlobalTableDataSource.class, BroadcastTableJoinableFactory.class ); @Override public void configure(Binder binder) { - MapBinder, JoinableFactory> joinableFactories = - DruidBinders.joinableFactoryBinder(binder); + // this binder maps JoinableFactory implementations to the type of DataSource they can handle + MapBinder, Class> joinableFactoryMappingBinder = + DruidBinders.joinableMappingBinder(binder); + + Multibinder joinableFactoryMultibinder = DruidBinders.joinableFactoryMultiBinder(binder); FACTORY_MAPPINGS.forEach((ds, factory) -> { - joinableFactories.addBinding(ds).to(factory); + joinableFactoryMultibinder.addBinding().to(factory); + joinableFactoryMappingBinder.addBinding(factory).toInstance(ds); binder.bind(factory).in(LazySingleton.class); }); diff --git a/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java b/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java new file mode 100644 index 000000000000..6c3289f8b735 --- /dev/null +++ b/server/src/main/java/org/apache/druid/segment/join/BroadcastTableJoinableFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.join; + +import com.google.common.collect.Iterators; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.GlobalTableDataSource; +import org.apache.druid.query.planning.DataSourceAnalysis; +import org.apache.druid.segment.join.table.IndexedTableJoinable; +import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable; +import org.apache.druid.server.SegmentManager; + +import java.util.Iterator; +import java.util.Optional; + +public class BroadcastTableJoinableFactory implements JoinableFactory +{ + private final SegmentManager segmentManager; + + @Inject + public BroadcastTableJoinableFactory(SegmentManager segmentManager) + { + this.segmentManager = segmentManager; + } + + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + GlobalTableDataSource broadcastDatasource = (GlobalTableDataSource) dataSource; + return broadcastDatasource != null && segmentManager.hasIndexedTables(broadcastDatasource.getName()); + } + + @Override + public Optional build( + DataSource dataSource, + JoinConditionAnalysis condition + ) + { + GlobalTableDataSource broadcastDatasource = (GlobalTableDataSource) dataSource; + if (condition.canHashJoin()) { + DataSourceAnalysis analysis = DataSourceAnalysis.forDataSource(broadcastDatasource); + return segmentManager.getIndexedTables(analysis).map(tables -> { + Iterator tableIterator = tables.iterator(); + if (!tableIterator.hasNext()) { + return null; + } + try { + return new IndexedTableJoinable(Iterators.getOnlyElement(tableIterator)); + } + catch (IllegalArgumentException iae) { + throw new ISE( + "Currently only single segment datasources are supported for broadcast joins, dataSource[%s] has multiple segments. Reingest the data so that it is entirely contained within a single segment to use in JOIN queries.", + broadcastDatasource.getName() + ); + } + }); + } + return Optional.empty(); + } +} diff --git a/server/src/main/java/org/apache/druid/server/SegmentManager.java b/server/src/main/java/org/apache/druid/server/SegmentManager.java index a5484f21c9a7..3b3d891596bb 100644 --- a/server/src/main/java/org/apache/druid/server/SegmentManager.java +++ b/server/src/main/java/org/apache/druid/server/SegmentManager.java @@ -24,24 +24,33 @@ import com.google.inject.Inject; import org.apache.druid.common.guava.SettableSupplier; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.query.TableDataSource; import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.join.table.IndexedTable; +import org.apache.druid.segment.join.table.ReferenceCountingIndexedTable; import org.apache.druid.segment.loading.SegmentLoader; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.PartitionChunk; import org.apache.druid.timeline.partition.PartitionHolder; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.utils.CollectionUtils; +import java.io.IOException; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; /** * This class is responsible for managing data sources and their states like timeline, total segment size, and number of @@ -61,6 +70,8 @@ public static class DataSourceState { private final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + + private final ConcurrentHashMap tablesLookup = new ConcurrentHashMap<>(); private long totalSegmentSize; private long numSegments; @@ -81,6 +92,11 @@ public VersionedIntervalTimeline getTimeline() return timeline; } + public ConcurrentHashMap getTablesLookup() + { + return tablesLookup; + } + public long getTotalSegmentSize() { return totalSegmentSize; @@ -155,13 +171,44 @@ public boolean isSegmentCached(final DataSegment segment) */ public Optional> getTimeline(DataSourceAnalysis analysis) { - final TableDataSource tableDataSource = - analysis.getBaseTableDataSource() - .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); - + final TableDataSource tableDataSource = getTableDataSource(analysis); return Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTimeline); } + /** + * Returns the collection of {@link IndexedTable} for the entire timeline (since join conditions do not currently + * consider the queries intervals), if the timeline exists for each of its segments that are joinable. + */ + public Optional> getIndexedTables(DataSourceAnalysis analysis) + { + return getTimeline(analysis).map(timeline -> { + // join doesn't currently consider intervals, so just consider all segments + final Stream segments = + timeline.lookup(Intervals.ETERNITY) + .stream() + .flatMap(x -> StreamSupport.stream(x.getObject().payloads().spliterator(), false)); + final TableDataSource tableDataSource = getTableDataSource(analysis); + ConcurrentHashMap tables = + Optional.ofNullable(dataSources.get(tableDataSource.getName())).map(DataSourceState::getTablesLookup) + .orElseThrow(() -> new ISE("Datasource %s does not have IndexedTables", tableDataSource.getName())); + return segments.map(segment -> tables.get(segment.getId())).filter(Objects::nonNull); + }); + } + + public boolean hasIndexedTables(String dataSourceName) + { + if (dataSources.containsKey(dataSourceName)) { + return dataSources.get(dataSourceName).tablesLookup.size() > 0; + } + return false; + } + + private TableDataSource getTableDataSource(DataSourceAnalysis analysis) + { + return analysis.getBaseTableDataSource() + .orElseThrow(() -> new ISE("Cannot handle datasource: %s", analysis.getDataSource())); + } + /** * Load a single segment. * @@ -194,6 +241,17 @@ public boolean loadSegment(final DataSegment segment, boolean lazy) throws Segme log.warn("Told to load an adapter for segment[%s] that already exists", segment.getId()); resultSupplier.set(false); } else { + + IndexedTable table = adapter.as(IndexedTable.class); + if (table != null) { + if (dataSourceState.isEmpty() || dataSourceState.numSegments == dataSourceState.tablesLookup.size()) { + dataSourceState.tablesLookup.put(segment.getId(), new ReferenceCountingIndexedTable(table)); + } else { + log.error("Cannot load segment[%s] with IndexedTable, no existing segments are joinable", segment.getId()); + } + } else if (dataSourceState.tablesLookup.size() > 0) { + log.error("Cannot load segment[%s] without IndexedTable, all existing segments are joinable", segment.getId()); + } loadedIntervals.add( segment.getInterval(), segment.getVersion(), @@ -203,7 +261,9 @@ public boolean loadSegment(final DataSegment segment, boolean lazy) throws Segme ); dataSourceState.addSegment(segment); resultSupplier.set(true); + } + return dataSourceState; } ); @@ -254,10 +314,18 @@ public void dropSegment(final DataSegment segment) final ReferenceCountingSegment oldQueryable = (removed == null) ? null : removed.getObject(); if (oldQueryable != null) { - dataSourceState.removeSegment(segment); - - log.info("Attempting to close segment %s", segment.getId()); - oldQueryable.close(); + try (final Closer closer = Closer.create()) { + dataSourceState.removeSegment(segment); + closer.register(oldQueryable); + log.info("Attempting to close segment %s", segment.getId()); + final ReferenceCountingIndexedTable oldTable = dataSourceState.tablesLookup.remove(segment.getId()); + if (oldTable != null) { + closer.register(oldTable); + } + } + catch (IOException e) { + throw new RuntimeException(e); + } } else { log.info( "Told to delete a queryable on dataSource[%s] for interval[%s] and version[%s] that I don't have.", diff --git a/server/src/test/java/org/apache/druid/guice/JoinableFactoryModuleTest.java b/server/src/test/java/org/apache/druid/guice/JoinableFactoryModuleTest.java index 5478a62deb93..4c2727f07d2d 100644 --- a/server/src/test/java/org/apache/druid/guice/JoinableFactoryModuleTest.java +++ b/server/src/test/java/org/apache/druid/guice/JoinableFactoryModuleTest.java @@ -27,13 +27,20 @@ import com.google.inject.Scopes; import com.google.inject.TypeLiteral; import org.apache.druid.query.DataSource; +import org.apache.druid.query.GlobalTableDataSource; +import org.apache.druid.query.InlineDataSource; +import org.apache.druid.query.LookupDataSource; import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.apache.druid.segment.join.BroadcastTableJoinableFactory; +import org.apache.druid.segment.join.InlineJoinableFactory; import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.segment.join.LookupJoinableFactory; import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.segment.join.NoopDataSource; import org.apache.druid.segment.join.NoopJoinableFactory; -import org.hamcrest.CoreMatchers; +import org.apache.druid.server.SegmentManager; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -64,28 +71,44 @@ public void testInjectJoinableFactoryIsSingleton() @Test public void testInjectDefaultBindingsShouldBeInjected() { - Map, JoinableFactory> joinableFactories = - injector.getInstance(Key.get(new TypeLiteral, JoinableFactory>>() {})); - Assert.assertEquals(JoinableFactoryModule.FACTORY_MAPPINGS.size(), joinableFactories.size()); - - final Set, Class>> expectedEntries = - JoinableFactoryModule.FACTORY_MAPPINGS.entrySet(); - - for (Map.Entry, Class> entry : expectedEntries) { - Assert.assertThat(joinableFactories.get(entry.getKey()), CoreMatchers.instanceOf(entry.getValue())); - } + final Set factories = + injector.getInstance(Key.get(new TypeLiteral>() {})); + Assert.assertEquals(JoinableFactoryModule.FACTORY_MAPPINGS.size(), factories.size()); + Map, Class> joinableFactoriesMappings = injector.getInstance( + Key.get(new TypeLiteral, Class>>() {}) + ); + assertDefaultFactories(joinableFactoriesMappings); } @Test public void testJoinableFactoryCanBind() { injector = makeInjectorWithProperties( - binder -> DruidBinders - .joinableFactoryBinder(binder).addBinding(NoopDataSource.class).toInstance(NoopJoinableFactory.INSTANCE)); - Map, JoinableFactory> joinableFactories = - injector.getInstance(Key.get(new TypeLiteral, JoinableFactory>>() {})); - Assert.assertEquals(JoinableFactoryModule.FACTORY_MAPPINGS.size() + 1, joinableFactories.size()); - Assert.assertEquals(NoopJoinableFactory.INSTANCE, joinableFactories.get(NoopDataSource.class)); + binder -> { + DruidBinders.joinableFactoryMultiBinder(binder).addBinding().toInstance(NoopJoinableFactory.INSTANCE); + DruidBinders.joinableMappingBinder(binder).addBinding(NoopJoinableFactory.class).toInstance(NoopDataSource.class); + } + ); + Map, Class> joinableFactoriesMappings = injector.getInstance( + Key.get(new TypeLiteral, Class>>() {}) + ); + Set factories = injector.getInstance(Key.get(new TypeLiteral>() {})); + + Assert.assertEquals(JoinableFactoryModule.FACTORY_MAPPINGS.size() + 1, factories.size()); + Assert.assertEquals(NoopDataSource.class, joinableFactoriesMappings.get(NoopJoinableFactory.class)); + assertDefaultFactories(joinableFactoriesMappings); + } + + private void assertDefaultFactories( + Map, Class> joinableFactoriesMappings + ) + { + Assert.assertEquals(LookupDataSource.class, joinableFactoriesMappings.get(LookupJoinableFactory.class)); + Assert.assertEquals(InlineDataSource.class, joinableFactoriesMappings.get(InlineJoinableFactory.class)); + Assert.assertEquals( + GlobalTableDataSource.class, + joinableFactoriesMappings.get(BroadcastTableJoinableFactory.class) + ); } private Injector makeInjectorWithProperties(Module... otherModules) @@ -97,6 +120,7 @@ private Injector makeInjectorWithProperties(Module... otherModules) ImmutableList.builder() .add(new JoinableFactoryModule()) .add(binder -> binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(lookupProvider)) + .add(binder -> binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class))) .add(binder -> binder.bindScope(LazySingleton.class, Scopes.SINGLETON)); for (final Module otherModule : otherModules) { diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 2fe75008d93c..2d2128edb4f3 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.client.DirectDruidClient; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; @@ -656,23 +657,26 @@ private void initWalker(final Map serverProperties, QuerySchedul .build() ); + final JoinableFactory globalFactory = new JoinableFactory() + { + @Override + public boolean isDirectlyJoinable(DataSource dataSource) + { + return ((GlobalTableDataSource) dataSource).getName().equals(GLOBAL); + } + + @Override + public Optional build(DataSource dataSource, JoinConditionAnalysis condition) + { + return Optional.empty(); + } + }; + final JoinableFactory joinableFactory = new MapJoinableFactory( - ImmutableMap., JoinableFactory>builder() - .put(InlineDataSource.class, new InlineJoinableFactory()) - .put(GlobalTableDataSource.class, new JoinableFactory() - { - @Override - public boolean isDirectlyJoinable(DataSource dataSource) - { - return ((GlobalTableDataSource) dataSource).getName().equals(GLOBAL); - } - - @Override - public Optional build(DataSource dataSource, JoinConditionAnalysis condition) - { - return Optional.empty(); - } - }) + ImmutableSet.of(globalFactory, new InlineJoinableFactory()), + ImmutableMap., Class>builder() + .put(InlineJoinableFactory.class, InlineDataSource.class) + .put(globalFactory.getClass(), GlobalTableDataSource.class) .build() ); diff --git a/server/src/test/java/org/apache/druid/server/QueryStackTests.java b/server/src/test/java/org/apache/druid/server/QueryStackTests.java index 867e3e208bbf..c70f7a4dec96 100644 --- a/server/src/test/java/org/apache/druid/server/QueryStackTests.java +++ b/server/src/test/java/org/apache/druid/server/QueryStackTests.java @@ -20,6 +20,7 @@ package org.apache.druid.server; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.client.cache.CacheConfig; import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.java.util.common.Pair; @@ -68,7 +69,7 @@ import org.apache.druid.segment.join.InlineJoinableFactory; import org.apache.druid.segment.join.JoinableFactory; import org.apache.druid.segment.join.LookupJoinableFactory; -import org.apache.druid.segment.join.MapJoinableFactoryTest; +import org.apache.druid.segment.join.MapJoinableFactory; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.scheduling.ManualQueryPrioritizationStrategy; @@ -78,6 +79,7 @@ import javax.annotation.Nullable; import java.nio.ByteBuffer; import java.util.Map; +import java.util.Set; /** * Utilities for creating query-stack objects for tests. @@ -303,22 +305,31 @@ public static JoinableFactory makeJoinableFactoryForLookup( LookupExtractorFactoryContainerProvider lookupProvider ) { - return makeJoinableFactoryFromDefault(lookupProvider, null); + return makeJoinableFactoryFromDefault(lookupProvider, null, null); } public static JoinableFactory makeJoinableFactoryFromDefault( @Nullable LookupExtractorFactoryContainerProvider lookupProvider, - @Nullable Map, JoinableFactory> custom + @Nullable Set customFactories, + @Nullable Map, Class> customMappings ) { - ImmutableMap.Builder, JoinableFactory> builder = ImmutableMap.builder(); - builder.put(InlineDataSource.class, new InlineJoinableFactory()); + ImmutableSet.Builder setBuilder = ImmutableSet.builder(); + ImmutableMap.Builder, Class> mapBuilder = + ImmutableMap.builder(); + setBuilder.add(new InlineJoinableFactory()); + mapBuilder.put(InlineJoinableFactory.class, InlineDataSource.class); if (lookupProvider != null) { - builder.put(LookupDataSource.class, new LookupJoinableFactory(lookupProvider)); + setBuilder.add(new LookupJoinableFactory(lookupProvider)); + mapBuilder.put(LookupJoinableFactory.class, LookupDataSource.class); } - if (custom != null) { - builder.putAll(custom); + if (customFactories != null) { + setBuilder.addAll(customFactories); } - return MapJoinableFactoryTest.fromMap(builder.build()); + if (customMappings != null) { + mapBuilder.putAll(customMappings); + } + + return new MapJoinableFactory(setBuilder.build(), mapBuilder.build()); } } diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java new file mode 100644 index 000000000000..7074434ba891 --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerBroadcastJoinIndexedTableTest.java @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.jackson.SegmentizerModule; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.DataSource; +import org.apache.druid.query.GlobalTableDataSource; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMerger; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.TestIndex; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.join.BroadcastTableJoinableFactory; +import org.apache.druid.segment.join.JoinConditionAnalysis; +import org.apache.druid.segment.join.Joinable; +import org.apache.druid.segment.loading.BroadcastJoinableMMappedQueryableSegmentizerFactory; +import org.apache.druid.segment.loading.DataSegmentPusher; +import org.apache.druid.segment.loading.LocalDataSegmentPuller; +import org.apache.druid.segment.loading.LocalLoadSpec; +import org.apache.druid.segment.loading.SegmentLoaderConfig; +import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager; +import org.apache.druid.segment.loading.SegmentLoadingException; +import org.apache.druid.segment.loading.SegmentizerFactory; +import org.apache.druid.segment.loading.StorageLocationConfig; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.metrics.NoopServiceEmitter; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +public class SegmentManagerBroadcastJoinIndexedTableTest extends InitializedNullHandlingTest +{ + private static final String TABLE_NAME = "test"; + private static final String PREFIX = "j0"; + private static final Set KEY_COLUMNS = + ImmutableSet.of("market", "longNumericNull", "doubleNumericNull", "floatNumericNull", "partial_null_column"); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private LocalDataSegmentPuller segmentPuller; + private ObjectMapper objectMapper; + private IndexIO indexIO; + private File segmentCacheDir; + private File segmentDeepStorageDir; + private SegmentLoaderLocalCacheManager segmentLoader; + private SegmentManager segmentManager; + private BroadcastTableJoinableFactory joinableFactory; + + @Before + public void setup() throws IOException + { + segmentPuller = new LocalDataSegmentPuller(); + objectMapper = new DefaultObjectMapper() + .registerModule(new SegmentizerModule()) + .registerModule( + new SimpleModule().registerSubtypes(new NamedType(LocalLoadSpec.class, "local")) + ); + + indexIO = new IndexIO(objectMapper, () -> 0); + objectMapper.setInjectableValues( + new InjectableValues.Std().addValue(LocalDataSegmentPuller.class, segmentPuller) + .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ObjectMapper.class.getName(), objectMapper) + .addValue(IndexIO.class, indexIO) + ); + segmentCacheDir = temporaryFolder.newFolder(); + segmentDeepStorageDir = temporaryFolder.newFolder(); + segmentLoader = new SegmentLoaderLocalCacheManager( + indexIO, + new SegmentLoaderConfig() + { + @Override + public List getLocations() + { + return Collections.singletonList( + new StorageLocationConfig(segmentCacheDir, null, null) + ); + } + }, + objectMapper + ); + segmentManager = new SegmentManager(segmentLoader); + joinableFactory = new BroadcastTableJoinableFactory(segmentManager); + EmittingLogger.registerEmitter(new NoopServiceEmitter()); + } + + @After + public void teardown() throws IOException + { + FileUtils.deleteDirectory(segmentCacheDir); + } + + @Test + public void testLoadIndexedTable() throws IOException, SegmentLoadingException + { + final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME); + Assert.assertFalse(joinableFactory.isDirectlyJoinable(dataSource)); + + final String version = DateTimes.nowUtc().toString(); + IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv"); + final String interval = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"; + DataSegment segment = createSegment(data, interval, version); + Assert.assertTrue(segmentManager.loadSegment(segment, false)); + + Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource)); + Optional maybeJoinable = makeJoinable(dataSource); + Assert.assertTrue(maybeJoinable.isPresent()); + Joinable joinable = maybeJoinable.get(); + // cardinality currently tied to number of rows, + Assert.assertEquals(1210, joinable.getCardinality("market")); + Assert.assertEquals(1210, joinable.getCardinality("placement")); + Assert.assertEquals( + Optional.of(ImmutableSet.of("preferred")), + joinable.getCorrelatedColumnValues( + "market", + "spot", + "placement", + Long.MAX_VALUE, + false + ) + ); + + // dropping the segment should make the table no longer available + segmentManager.dropSegment(segment); + + maybeJoinable = makeJoinable(dataSource); + + Assert.assertFalse(maybeJoinable.isPresent()); + } + + @Test + public void testLoadMultipleIndexedTableOverwrite() throws IOException, SegmentLoadingException + { + final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME); + Assert.assertFalse(joinableFactory.isDirectlyJoinable(dataSource)); + + // larger interval overwrites smaller interval + final String version = DateTimes.nowUtc().toString(); + final String version2 = DateTimes.nowUtc().plus(1000L).toString(); + final String interval = "2011-01-12T00:00:00.000Z/2011-03-28T00:00:00.000Z"; + final String interval2 = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"; + IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.top"); + IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom"); + DataSegment segment1 = createSegment(data, interval, version); + DataSegment segment2 = createSegment(data2, interval2, version2); + Assert.assertTrue(segmentManager.loadSegment(segment1, false)); + Assert.assertTrue(segmentManager.loadSegment(segment2, false)); + + Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource)); + Optional maybeJoinable = makeJoinable(dataSource); + Assert.assertTrue(maybeJoinable.isPresent()); + + Joinable joinable = maybeJoinable.get(); + // cardinality currently tied to number of rows, + Assert.assertEquals(733, joinable.getCardinality("market")); + Assert.assertEquals(733, joinable.getCardinality("placement")); + Assert.assertEquals( + Optional.of(ImmutableSet.of("preferred")), + joinable.getCorrelatedColumnValues( + "market", + "spot", + "placement", + Long.MAX_VALUE, + false + ) + ); + + segmentManager.dropSegment(segment2); + + // if new segment is dropped for some reason that probably never happens, old table should still exist.. + maybeJoinable = makeJoinable(dataSource); + Assert.assertTrue(maybeJoinable.isPresent()); + + joinable = maybeJoinable.get(); + // cardinality currently tied to number of rows, + Assert.assertEquals(478, joinable.getCardinality("market")); + Assert.assertEquals(478, joinable.getCardinality("placement")); + Assert.assertEquals( + Optional.of(ImmutableSet.of("preferred")), + joinable.getCorrelatedColumnValues( + "market", + "spot", + "placement", + Long.MAX_VALUE, + false + ) + ); + } + + + @Test + public void testLoadMultipleIndexedTable() throws IOException, SegmentLoadingException + { + final DataSource dataSource = new GlobalTableDataSource(TABLE_NAME); + Assert.assertFalse(joinableFactory.isDirectlyJoinable(dataSource)); + + final String version = DateTimes.nowUtc().toString(); + final String version2 = DateTimes.nowUtc().plus(1000L).toString(); + final String interval = "2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z"; + final String interval2 = "2011-01-12T00:00:00.000Z/2011-03-28T00:00:00.000Z"; + IncrementalIndex data = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.bottom"); + IncrementalIndex data2 = TestIndex.makeRealtimeIndex("druid.sample.numeric.tsv.top"); + Assert.assertTrue(segmentManager.loadSegment(createSegment(data, interval, version), false)); + Assert.assertTrue(joinableFactory.isDirectlyJoinable(dataSource)); + + Optional maybeJoinable = makeJoinable(dataSource); + Assert.assertTrue(maybeJoinable.isPresent()); + + Joinable joinable = maybeJoinable.get(); + // cardinality currently tied to number of rows, + Assert.assertEquals(733, joinable.getCardinality("market")); + Assert.assertEquals(733, joinable.getCardinality("placement")); + Assert.assertEquals( + Optional.of(ImmutableSet.of("preferred")), + joinable.getCorrelatedColumnValues( + "market", + "spot", + "placement", + Long.MAX_VALUE, + false + ) + ); + + // add another segment with smaller interval, only partially overshadows so there will be 2 segments in timeline + Assert.assertTrue(segmentManager.loadSegment(createSegment(data2, interval2, version2), false)); + + + expectedException.expect(ISE.class); + expectedException.expectMessage( + StringUtils.format( + "Currently only single segment datasources are supported for broadcast joins, dataSource[%s] has multiple segments. Reingest the data so that it is entirely contained within a single segment to use in JOIN queries.", + TABLE_NAME + ) + ); + // this will explode because datasource has multiple segments which is an invalid state for the joinable factory + makeJoinable(dataSource); + } + + private Optional makeJoinable(DataSource dataSource) + { + return joinableFactory.build( + dataSource, + JoinConditionAnalysis.forExpression( + StringUtils.format("market == \"%s.market\"", PREFIX), + PREFIX, + ExprMacroTable.nil() + ) + ); + } + + private DataSegment createSegment(IncrementalIndex data, String interval, String version) throws IOException + { + final DataSegment tmpSegment = new DataSegment( + TABLE_NAME, + Intervals.of(interval), + version, + Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), + new NumberedShardSpec(0, 0), + 9, + 100 + ); + final String storageDir = DataSegmentPusher.getDefaultStorageDir(tmpSegment, false); + final File segmentDir = new File(segmentDeepStorageDir, storageDir); + org.apache.commons.io.FileUtils.forceMkdir(segmentDir); + + IndexMerger indexMerger = + new IndexMergerV9(objectMapper, indexIO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); + + SegmentizerFactory factory = new BroadcastJoinableMMappedQueryableSegmentizerFactory(indexIO, KEY_COLUMNS); + + indexMerger.persist( + data, + Intervals.of(interval), + segmentDir, + new IndexSpec( + null, + null, + null, + null, + factory + ), + null + ); + final File factoryJson = new File(segmentDir, "factory.json"); + objectMapper.writeValue(factoryJson, factory); + return tmpSegment.withLoadSpec( + ImmutableMap.of("type", "local", "path", segmentDir.getAbsolutePath()) + ); + } +} diff --git a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java index 2e5c2e3d99a9..e5593f131988 100644 --- a/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java +++ b/server/src/test/java/org/apache/druid/server/SegmentManagerThreadSafetyTest.java @@ -254,7 +254,7 @@ public StorageAdapter asStorageAdapter() @Override public T as(Class clazz) { - throw new UnsupportedOperationException(); + return null; } @Override diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java index a5e883176039..c9e8a0d7d0f9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java @@ -105,7 +105,7 @@ public void setUp() binder -> { binder.bind(QueryLifecycleFactory.class).toInstance(queryLifecycleFactory); binder.bind(TimelineServerView.class).toInstance(serverView); - binder.bind(JoinableFactory.class).toInstance(new MapJoinableFactory(ImmutableMap.of())); + binder.bind(JoinableFactory.class).toInstance(new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of())); binder.bind(PlannerConfig.class).toInstance(plannerConfig); binder.bind(ViewManager.class).toInstance(viewManager); binder.bind(Escalator.class).toInstance(escalator); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java index 0ba26ea9d3ea..89f141bc73c6 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaNoDataInitTest.java @@ -20,6 +20,7 @@ package org.apache.druid.sql.calcite.schema; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.QueryRunnerFactoryConglomerate; import org.apache.druid.segment.join.MapJoinableFactory; @@ -55,7 +56,7 @@ public void testInitializationWithNoData() throws Exception ), new TestServerInventoryView(Collections.emptyList()), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), - new MapJoinableFactory(ImmutableMap.of()), + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index adb3626c54af..9b633e8a19e1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -254,7 +254,7 @@ public Optional build( CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), serverView, segmentManager, - new MapJoinableFactory(ImmutableMap.of(GlobalTableDataSource.class, globalTableJoinable)), + new MapJoinableFactory(ImmutableSet.of(globalTableJoinable), ImmutableMap.of(globalTableJoinable.getClass(), GlobalTableDataSource.class)), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 08fe2ea5b70a..9bbdc307503c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -247,7 +247,7 @@ public Authorizer getAuthorizer(String name) CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), new TestServerInventoryView(walker.getSegments(), realtimeSegments), new SegmentManager(EasyMock.createMock(SegmentLoader.class)), - new MapJoinableFactory(ImmutableMap.of()), + new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), PLANNER_CONFIG_DEFAULT, new NoopViewManager(), new NoopEscalator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 0c186ed55589..463670736f25 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -730,10 +730,8 @@ public static JoinableFactory createDefaultJoinableFactory() { return QueryStackTests.makeJoinableFactoryFromDefault( INJECTOR.getInstance(LookupExtractorFactoryContainerProvider.class), - ImmutableMap.of( - GlobalTableDataSource.class, - CUSTOM_ROW_TABLE_JOINABLE - ) + ImmutableSet.of(CUSTOM_ROW_TABLE_JOINABLE), + ImmutableMap.of(CUSTOM_ROW_TABLE_JOINABLE.getClass(), GlobalTableDataSource.class) ); }