Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.SubstringDimExtractionFn;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.lookup.LookupReferencesManager;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
Expand Down Expand Up @@ -99,7 +99,7 @@ public class BloomFilterSqlAggregatorTest
private static final Injector injector = Guice.createInjector(
binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
binder.bind(LookupReferencesManager.class).toInstance(
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
ImmutableMap.of(
"a", "xa",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.apache.druid.query.filter.BloomKFilterHolder;
import org.apache.druid.query.filter.ExpressionDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.lookup.LookupReferencesManager;
import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.server.security.AuthenticationResult;
Expand All @@ -67,7 +67,7 @@ public class BloomDimFilterSqlTest extends BaseCalciteQueryTest
private static final Injector injector = Guice.createInjector(
binder -> {
binder.bind(Key.get(ObjectMapper.class, Json.class)).toInstance(TestHelper.makeJsonMapper());
binder.bind(LookupReferencesManager.class).toInstance(
binder.bind(LookupExtractorFactoryContainerProvider.class).toInstance(
LookupEnabledTestExprMacroTable.createTestLookupReferencesManager(
ImmutableMap.of(
"a", "xa",
Expand Down
11 changes: 5 additions & 6 deletions integration-tests/docker/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,19 @@ command=java
-Xms512m
-XX:NewSize=256m
-XX:MaxNewSize=256m
-XX:+UseConcMarkSweepGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.processing.buffer.sizeBytes=25000000
-Ddruid.server.http.numThreads=100
-Ddruid.server.http.numThreads=40
-Ddruid.processing.numThreads=1
-Ddruid.broker.http.numConnections=30
-Ddruid.broker.http.numConnections=20
-Ddruid.broker.http.readTimeout=PT5M
-Ddruid.broker.cache.useCache=true
-Ddruid.broker.cache.populateCache=true
-Ddruid.cache.type=local
-Ddruid.lookup.namespace.cache.type=onHeap
-Ddruid.cache.sizeInBytes=40000000
-Ddruid.lookup.numLookupLoadingThreads=1
-Ddruid.auth.authenticatorChain="[\"basic\"]"
Expand Down Expand Up @@ -63,3 +61,4 @@ redirect_stderr=true
autorestart=false
priority=100
stdout_logfile=/shared/logs/broker.log
environment=AWS_REGION=us-east-1
10 changes: 6 additions & 4 deletions integration-tests/docker/coordinator.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@ command=java
-server
-Xmx128m
-Xms128m
-XX:+UseConcMarkSweepGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
-Ddruid.server.http.numThreads=100
-Ddruid.server.http.numThreads=20
-Ddruid.metadata.storage.type=mysql
-Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid
-Ddruid.metadata.storage.connector.user=druid
-Ddruid.metadata.storage.connector.password=diurd
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.coordinator.startDelay=PT5S
-Ddruid.lookup.numLookupLoadingThreads=1
-Ddruid.manager.lookups.hostUpdateTimeout=PT30S
-Ddruid.manager.lookups.period=10000
-Ddruid.manager.lookups.threadPoolSize=2
-Ddruid.auth.authenticatorChain="[\"basic\"]"
-Ddruid.auth.authenticator.basic.type=basic
-Ddruid.auth.authenticator.basic.initialAdminPassword=priest
Expand Down Expand Up @@ -57,3 +58,4 @@ redirect_stderr=true
priority=100
autorestart=false
stdout_logfile=/shared/logs/coordinator.log
environment=AWS_REGION=us-east-1
6 changes: 2 additions & 4 deletions integration-tests/docker/historical.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ command=java
-Xms512m
-XX:NewSize=256m
-XX:MaxNewSize=256m
-XX:+UseConcMarkSweepGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
Expand All @@ -16,7 +14,7 @@ command=java
-Ddruid.s3.secretKey=OBaLISDFjKLajSTrJ53JoTtzTZLjPlRePcwa+Pjv
-Ddruid.processing.buffer.sizeBytes=25000000
-Ddruid.processing.numThreads=2
-Ddruid.server.http.numThreads=100
-Ddruid.server.http.numThreads=20
-Ddruid.segmentCache.locations="[{\"path\":\"/shared/druid/indexCache\",\"maxSize\":5000000000}]"
-Ddruid.server.maxSize=5000000000
-Ddruid.lookup.numLookupLoadingThreads=1
Expand Down
8 changes: 3 additions & 5 deletions integration-tests/docker/middlemanager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ command=java
-server
-Xmx64m
-Xms64m
-XX:+UseConcMarkSweepGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
Expand All @@ -14,10 +12,10 @@ command=java
-Ddruid.worker.capacity=3
-Ddruid.indexer.logs.directory=/shared/tasklogs
-Ddruid.storage.storageDirectory=/shared/storage
-Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml"
-Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml"
-Ddruid.indexer.fork.property.druid.processing.buffer.sizeBytes=25000000
-Ddruid.indexer.fork.property.druid.processing.numThreads=1
-Ddruid.indexer.fork.server.http.numThreads=100
-Ddruid.indexer.fork.server.http.numThreads=20
-Ddruid.s3.accessKey=AKIAJI7DG7CDECGBQ6NA
-Ddruid.s3.secretKey=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
-Ddruid.worker.ip=%(ENV_HOST_IP)s
Expand Down
7 changes: 3 additions & 4 deletions integration-tests/docker/overlord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ command=java
-server
-Xmx128m
-Xms128m
-XX:+UseConcMarkSweepGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
-Ddruid.server.http.numThreads=100
-Ddruid.server.http.numThreads=20
-Ddruid.metadata.storage.type=mysql
-Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid
-Ddruid.metadata.storage.connector.user=druid
Expand Down Expand Up @@ -58,3 +56,4 @@ redirect_stderr=true
priority=100
autorestart=false
stdout_logfile=/shared/logs/overlord.log
environment=AWS_REGION=us-east-1
7 changes: 3 additions & 4 deletions integration-tests/docker/router.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
command=java
-server
-Xmx128m
-XX:+UseConcMarkSweepGC
-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps
-XX:+UseG1GC
-Duser.timezone=UTC
-Dfile.encoding=UTF-8
-Ddruid.host=%(ENV_HOST_IP)s
-Ddruid.zk.service.host=druid-zookeeper-kafka
-Ddruid.server.http.numThreads=100
-Ddruid.server.http.numThreads=20
-Ddruid.lookup.numLookupLoadingThreads=1
-Ddruid.auth.authenticatorChain="[\"basic\"]"
-Ddruid.auth.authenticator.basic.type=basic
Expand Down Expand Up @@ -52,3 +50,4 @@ redirect_stderr=true
priority=100
autorestart=false
stdout_logfile=/shared/logs/router.log
environment=AWS_REGION=us-east-1
1 change: 1 addition & 0 deletions integration-tests/docker/wiki-simple-lookup.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"Wikipedia:Vandalismusmeldung":"lookup!"}
5 changes: 5 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@
<artifactId>druid-basic-security</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-lookups-cached-global</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>simple-client-sslcontext</artifactId>
Expand Down
1 change: 1 addition & 0 deletions integration-tests/run_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib
# one of the integration tests needs the wikiticker sample data
mkdir -p $SHARED_DIR/wikiticker-it
cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz
cp docker/wiki-simple-lookup.json $SHARED_DIR/wikiticker-it/wiki-simple-lookup.json

docker network create --subnet=172.172.172.0/24 druid-it-net

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.HostAndPort;
import com.google.inject.Inject;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
Expand All @@ -29,6 +31,8 @@
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.lookup.LookupsState;
import org.apache.druid.server.lookup.cache.LookupExtractorFactoryMapContainer;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -218,6 +222,90 @@ public HttpResponseStatus getProxiedOverlordScalingResponseStatus()
}
}

public Map<String, Object> initializeLookups(String filePath) throws Exception
{
String url = StringUtils.format("%slookups/config", getCoordinatorURL());
StatusResponseHolder response = httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(ImmutableMap.of())
), responseHandler
).get();

if (!response.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
throw new ISE(
"Error while querying[%s] status[%s] content[%s]",
url,
response.getStatus(),
response.getContent()
);
}

Map<String, Object> results = jsonMapper.readValue(
response.getContent(),
new TypeReference<Map<String, Object>>(){}
);

StatusResponseHolder response2 = httpClient.go(
new Request(HttpMethod.POST, new URL(url)).setContent(
"application/json",
jsonMapper.writeValueAsBytes(jsonMapper.readValue(CoordinatorResourceTestClient.class.getResourceAsStream(filePath), new TypeReference<Map<Object, Object>>(){}))
), responseHandler
).get();

if (!response2.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
throw new ISE(
"Error while querying[%s] status[%s] content[%s]",
url,
response2.getStatus(),
response2.getContent()
);
}

Map<String, Object> results2 = jsonMapper.readValue(
response.getContent(),
new TypeReference<Map<String, Object>>()
{
}
);

return results2;
}

private Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> getLookupLoadStatus()
{
String url = StringUtils.format("%slookups/nodeStatus", getCoordinatorURL());

Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> status;
try {
StatusResponseHolder response = makeRequest(HttpMethod.GET, url);

status = jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>>>()
{
}
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
return status;
}

public boolean areLookupsLoaded(String lookup)
{
final Map<String, Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>>> status = getLookupLoadStatus();

final Map<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> defaultTier = status.get("__default");

boolean isLoaded = true;
for (Map.Entry<HostAndPort, LookupsState<LookupExtractorFactoryMapContainer>> host : defaultTier.entrySet()) {
isLoaded &= host.getValue().getCurrent().containsKey(lookup);
}

return isLoaded;
}

private StatusResponseHolder makeRequest(HttpMethod method, String url)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,31 +28,30 @@
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

import java.util.concurrent.Callable;

@Guice(moduleFactory = DruidTestModuleFactory.class)
public class ITWikipediaQueryTest
{
private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream";
private static final String WIKI_LOOKUP = "wiki-simple";
private static final String WIKIPEDIA_QUERIES_RESOURCE = "/queries/wikipedia_editstream_queries.json";
private static final String WIKIPEDIA_LOOKUP_RESOURCE = "/queries/wiki-lookup-config.json";

@Inject
private CoordinatorResourceTestClient coordinatorClient;
@Inject
private TestQueryHelper queryHelper;

@BeforeMethod
public void before()
public void before() throws Exception
{

// ensure that wikipedia segments are loaded completely
RetryUtil.retryUntilTrue(
new Callable<Boolean>()
{
@Override
public Boolean call()
{
return coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE);
}
}, "wikipedia segment load"
() -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load"
);
coordinatorClient.initializeLookups(WIKIPEDIA_LOOKUP_RESOURCE);
RetryUtil.retryUntilTrue(
() -> coordinatorClient.areLookupsLoaded(WIKI_LOOKUP), "wikipedia lookup load"
);
}

Expand All @@ -61,5 +60,4 @@ public void testWikipediaQueriesFromFile() throws Exception
{
queryHelper.testQueriesFromFile(WIKIPEDIA_QUERIES_RESOURCE, 2);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"__default": {
"wiki-simple": {
"version": "v1",
"lookupExtractorFactory": {
"type": "cachedNamespace",
"extractionNamespace": {
"type": "uri",
"uri": "file:/shared/wikiticker-it/wiki-simple-lookup.json",
"namespaceParseSpec": {
"format": "simpleJson"
},
"pollPeriod": "PT10S"
},
"firstCacheTimeout": 0
}
}
}
}
Loading