Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.inject.Injector;
import com.google.inject.Module;
Expand Down Expand Up @@ -379,7 +380,7 @@ public void emit(Event event)
baseClient,
null /* local client; unused in this test, so pass in null */,
warehouse,
new MapJoinableFactory(ImmutableMap.of()),
new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()),
retryConfig,
jsonMapper,
serverConfig,
Expand Down
5 changes: 4 additions & 1 deletion integration-tests/docker/environment-configs/broker
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,7 @@ druid_auth_basic_common_cacheDirectory=/tmp/authCache/broker
druid_sql_avatica_enable=true
druid_server_https_crlPath=/tls/revocations.crl
druid_query_scheduler_laning_strategy=manual
druid_query_scheduler_laning_lanes_one=1
druid_query_scheduler_laning_lanes_one=1
druid_segmentCache_locations=[{"path":"/shared/druid/brokerIndexCache","maxSize":1000000000}]
druid_server_maxSize=1000000000
druid_sql_planner_metadataRefreshPeriod=PT15S
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.lookup.LookupsState;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.apache.druid.server.coordinator.rules.Rule;
import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
Expand Down Expand Up @@ -376,6 +377,26 @@ public void postDynamicConfig(CoordinatorDynamicConfig coordinatorDynamicConfig)
}
}

public void postLoadRules(String datasourceName, List<Rule> rules) throws Exception
{
String url = StringUtils.format("%srules/%s", getCoordinatorURL(), datasourceName);
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(rules)
), responseHandler
).get();

if (!response.getStatus().equals(HttpResponseStatus.OK)) {
throw new ISE(
"Error while setting dynamic config[%s] status[%s] content[%s]",
url,
response.getStatus(),
response.getContent()
);
}
}

public CoordinatorDynamicConfig getDynamicConfig()
{
String url = StringUtils.format("%sconfig", getCoordinatorURL());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.tests.query;

import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.server.coordinator.rules.ForeverBroadcastDistributionRule;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.SqlTestQueryHelper;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Test(groups = TestNGGroup.QUERY)
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITBroadcastJoinQueryTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITBroadcastJoinQueryTest.class);
private static final String BROADCAST_JOIN_TASK = "/indexer/broadcast_join_index_task.json";
private static final String BROADCAST_JOIN_METADATA_QUERIES_RESOURCE = "/queries/broadcast_join_metadata_queries.json";
private static final String BROADCAST_JOIN_QUERIES_RESOURCE = "/queries/broadcast_join_queries.json";
private static final String BROADCAST_JOIN_DATASOURCE = "broadcast_join_wikipedia_test";


@Inject
ServerDiscoveryFactory factory;

@Inject
CoordinatorResourceTestClient coordinatorClient;

@Inject
SqlTestQueryHelper queryHelper;

@Inject
@TestClient
HttpClient httpClient;

@Inject
IntegrationTestingConfig config;

@Test
public void testBroadcastJoin() throws Exception
{
final Closer closer = Closer.create();
try {
closer.register(unloader(BROADCAST_JOIN_DATASOURCE));

// prepare for broadcast
coordinatorClient.postLoadRules(
BROADCAST_JOIN_DATASOURCE,
ImmutableList.of(new ForeverBroadcastDistributionRule())
);

// load the data
String taskJson = replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_TASK), BROADCAST_JOIN_DATASOURCE);
String taskId = indexer.submitTask(taskJson);

ITRetryUtil.retryUntilTrue(
() -> coordinatorClient.areSegmentsLoaded(BROADCAST_JOIN_DATASOURCE), "broadcast segment load"
);

// query metadata until druid schema is refreshed and datasource is available joinable
ITRetryUtil.retryUntilTrue(
() -> {
try {
queryHelper.testQueriesFromString(
queryHelper.getQueryURL(config.getRouterUrl()),
replaceJoinTemplate(
getResourceAsString(BROADCAST_JOIN_METADATA_QUERIES_RESOURCE),
BROADCAST_JOIN_DATASOURCE
),
1
);
return true;
}
catch (Exception ex) {
return false;
}
},
"waiting for SQL metadata refresh"
);

// now do some queries
queryHelper.testQueriesFromString(
queryHelper.getQueryURL(config.getRouterUrl()),
replaceJoinTemplate(getResourceAsString(BROADCAST_JOIN_QUERIES_RESOURCE), BROADCAST_JOIN_DATASOURCE),
1
);
}
finally {
closer.close();
}
}

private static String replaceJoinTemplate(String template, String joinDataSource)
{
return StringUtils.replace(
StringUtils.replace(template, "%%JOIN_DATASOURCE%%", joinDataSource),
"%%REGULAR_DATASOURCE%%",
ITWikipediaQueryTest.WIKIPEDIA_DATA_SOURCE
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITWikipediaQueryTest
{
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
public static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
private static final String WIKI_LOOKUP = "wiki-simple";
private static final String WIKIPEDIA_QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries.json";
private static final String WIKIPEDIA_LOOKUP_RESOURCE = "/queries/wiki-lookup-config.json";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
{
"type": "index_parallel",
"spec": {
"dataSchema": {
"dataSource": "%%JOIN_DATASOURCE%%",
"timestampSpec": {
"column": "timestamp",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"page",
"language",
"user",
"unpatrolled",
"newPage",
"robot",
"anonymous",
"namespace",
"continent",
"country",
"region",
"city",
{
"type": "long",
"name": "added"
},
{
"type": "long",
"name": "deleted"
}
]
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "sum_added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "sum_deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"segmentGranularity": "YEAR",
"queryGranularity": "second"
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "local",
"baseDir": "/resources/data/union_query/",
"filter": "wikipedia_index_data*"
},
"appendToExisting": false,
"inputFormat": {
"type": "json"
}
},
"tuningConfig": {
"type": "index_parallel",
"indexSpec": {
"segmentLoader": {
"type": "broadcastJoinableMMapSegmentFactory",
"keyColumns": ["user", "language", "added", "deleted"]
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
"expectedResults": [
{
"server_type":"historical"
},
{
"server_type":"broker"
}
]
},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"description": "query information schema to make sure datasource is joinable and broadcast",
"query": {
"query": "SELECT TABLE_NAME, IS_JOINABLE, IS_BROADCAST FROM INFORMATION_SCHEMA.TABLES WHERE IS_JOINABLE = 'YES' AND IS_BROADCAST = 'YES' AND TABLE_SCHEMA = 'druid'"
},
"expectedResults": [
{
"TABLE_NAME": "%%JOIN_DATASOURCE%%",
"IS_JOINABLE": "YES",
"IS_BROADCAST": "YES"
}
]
},
{
"description": "query information schema to make sure druid schema is refreshed",
"query": {
"query": "SELECT COUNT(*) FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '%%JOIN_DATASOURCE%%'"
},
"expectedResults": [
{
"EXPR$0": 19
}
]
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
[
{
"description": "query broadcast join segment directly",
"query": {
"query": "SELECT \"%%JOIN_DATASOURCE%%\".\"user\", SUM(\"%%JOIN_DATASOURCE%%\".\"added\") FROM druid.\"%%JOIN_DATASOURCE%%\" GROUP BY 1 ORDER BY 2",
"resultFormat": "OBJECT"
},
"expectedResults": [
{"user":"stringer","EXPR$1":2},
{"user":"nuclear","EXPR$1":114},
{"user":"masterYi","EXPR$1":246},
{"user":"speed","EXPR$1":918},
{"user":"triplets","EXPR$1":1810}
]
},
{
"description": "regular datasource is lhs, broadcast join datasource is rhs",
"query": {
"query": "SELECT \"%%JOIN_DATASOURCE%%\".\"language\" as l1, \"%%REGULAR_DATASOURCE%%\".\"language\" as l2, SUM(\"%%JOIN_DATASOURCE%%\".\"sum_added\"), SUM(\"%%REGULAR_DATASOURCE%%\".\"added\") FROM druid.\"%%REGULAR_DATASOURCE%%\" INNER JOIN druid.\"%%JOIN_DATASOURCE%%\" ON \"%%REGULAR_DATASOURCE%%\".\"language\" = \"%%JOIN_DATASOURCE%%\".\"language\" GROUP BY 1, 2 ORDER BY 3 DESC",
"resultFormat": "OBJECT"
},
"expectedResults": [
{"l1":"en","l2":"en","EXPR$2":1.372562064E9,"EXPR$3":2.191945776E9},
{"l1":"zh","l2":"zh","EXPR$2":2.0833281E8,"EXPR$3":9.6017292E7},
{"l1":"ru","l2":"ru","EXPR$2":6.6673872E7,"EXPR$3":2.19902506E8},
{"l1":"ja","l2":"ja","EXPR$2":249728.0,"EXPR$3":8.3520802E7}
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
"expectedResults": [
{
"server_type":"historical"
},
{
"server_type":"broker"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,15 @@
"tier": "_default_tier",
"curr_size": 2208932412,
"max_size": 5000000000
},
{
"server": "172.172.172.8:8282",
"host": "172.172.172.8",
"plaintext_port": 8082,
"tls_port": 8282,
"server_type": "broker",
"tier": "_default_tier",
"curr_size": 0,
"max_size": 1000000000
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.druid.segment.loading.BroadcastJoinableMMappedQueryableSegmentizerFactory;
import org.apache.druid.segment.loading.MMappedQueryableSegmentizerFactory;

public class SegmentizerModule extends SimpleModule
Expand All @@ -29,5 +30,8 @@ public SegmentizerModule()
{
super("SegmentizerModule");
registerSubtypes(new NamedType(MMappedQueryableSegmentizerFactory.class, "mMapSegmentFactory"));
registerSubtypes(
new NamedType(BroadcastJoinableMMappedQueryableSegmentizerFactory.class, "broadcastJoinableMMapSegmentFactory")
);
}
}
Loading