From 72a3e67aa4902309acd70daeb2d663fad4689367 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 13 Dec 2018 18:38:44 -0800 Subject: [PATCH 1/6] Fixes and tests for spaces/non-ASCII datasource names --- ci/travis_script_integration_part2.sh | 2 +- .../druid/java/util/common/StringUtils.java | 15 +++- .../java/util/common/StringUtilsTest.java | 10 +++ .../indexing/common/IndexTaskClient.java | 6 +- .../http/security/TaskResourceFilter.java | 3 +- .../indexer_static/js/console-0.0.1.js | 2 +- integration-tests/docker/Dockerfile | 4 +- integration-tests/docker/middlemanager.conf | 2 +- integration-tests/pom.xml | 1 + integration-tests/run_cluster.sh | 26 +++--- .../testing/ConfigFileConfigProvider.java | 6 ++ .../druid/testing/DockerConfigProvider.java | 9 +++ .../testing/IntegrationTestingConfig.java | 2 + .../clients/ClientInfoResourceTestClient.java | 7 +- .../CoordinatorResourceTestClient.java | 8 +- .../clients/OverlordResourceTestClient.java | 5 +- .../druid/testing/utils/TestQueryHelper.java | 2 + .../indexer/AbstractITBatchIndexTest.java | 71 ++++++++++++++-- .../AbstractITRealtimeIndexTaskTest.java | 17 +++- .../tests/indexer/AbstractIndexerTest.java | 4 + .../tests/indexer/ITCompactionTaskTest.java | 80 +++++++++++++++---- .../druid/tests/indexer/ITIndexerTest.java | 6 +- .../indexer/ITKafkaIndexingServiceTest.java | 18 ++++- .../druid/tests/indexer/ITKafkaTest.java | 16 +++- .../indexer/ITNestedQueryPushDownTest.java | 39 ++++++++- .../tests/indexer/ITParallelIndexTest.java | 2 +- .../druid/tests/indexer/ITUnionQueryTest.java | 44 ++++++++-- .../test/resources/indexer/union_queries.json | 32 ++++---- .../indexer/wikipedia_compaction_task.json | 2 +- .../indexer/wikipedia_index_queries.json | 4 +- .../indexer/wikipedia_index_task.json | 2 +- .../wikipedia_parallel_index_queries.json | 4 +- .../wikipedia_parallel_index_task.json | 2 +- ...a_realtime_appenderator_index_queries.json | 6 +- ...edia_realtime_appenderator_index_task.json | 2 +- .../wikipedia_realtime_index_queries.json | 6 +- .../wikipedia_realtime_index_task.json | 2 +- .../indexer/wikipedia_reindex_task.json | 4 +- .../indexer/wikipedia_union_index_task.json | 2 +- .../indexer/wikiticker_index_task.json | 22 +---- .../queries/nestedquerypushdown_queries.json | 10 +-- .../client/coordinator/CoordinatorClient.java | 2 +- .../indexing/HttpIndexingServiceClient.java | 12 ++- .../firehose/ChatHandlerResource.java | 3 +- .../TaskIdResponseHeaderFilterHolder.java | 3 +- 45 files changed, 390 insertions(+), 137 deletions(-) diff --git a/ci/travis_script_integration_part2.sh b/ci/travis_script_integration_part2.sh index 61b3b9b7b6fa..24cd7970ece8 100755 --- a/ci/travis_script_integration_part2.sh +++ b/ci/travis_script_integration_part2.sh @@ -21,6 +21,6 @@ set -e pushd $TRAVIS_BUILD_DIR/integration-tests -mvn verify -P integration-tests -Dit.test=ITUnionQueryTest,ITTwitterQueryTest,ITWikipediaQueryTest,ITBasicAuthConfigurationTest,ITTLSTest +mvn verify -P integration-tests -Dit.test=ITUnionQueryTest,ITNestedQueryPushDownTest,ITTwitterQueryTest,ITWikipediaQueryTest,ITBasicAuthConfigurationTest,ITTLSTest popd diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index 85f71d8a7a76..fe633860ee9c 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.nio.charset.Charset; @@ -156,7 +157,19 @@ public static String toUpperCase(String s) public static String urlEncode(String s) { try { - return URLEncoder.encode(s, "UTF-8"); + // application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form + // data as well, so replace "+" with "%20". + return StringUtils.replace(URLEncoder.encode(s, "UTF-8"), "+", "%20"); + } + catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + } + + public static String urlDecode(String s) + { + try { + return URLDecoder.decode(s, "UTF-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java index 53f4942cf244..bd9a95f5aced 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java @@ -148,4 +148,14 @@ public void testReplace() Assert.assertEquals("bb", StringUtils.replace("aaaa", "aa", "b")); Assert.assertEquals("", StringUtils.replace("aaaa", "aa", "")); } + + @Test + public void testURLEncodeSpace() + { + String s1 = StringUtils.urlEncode("aaa bbb"); + Assert.assertEquals(s1, "aaa%20bbb"); + + String s2 = StringUtils.urlEncode("fff+ggg"); + Assert.assertEquals(s2, "fff%2Bggg"); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java index 148dc6c87045..84d923c7de3a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java @@ -256,7 +256,7 @@ private Request createRequest( final Request request = new Request(method, serviceUrl); // used to validate that we are talking to the correct worker - request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId); + request.addHeader(ChatHandlerResource.TASK_ID_HEADER, StringUtils.urlEncode(taskId)); if (content.length > 0) { request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content); } @@ -334,7 +334,9 @@ private FullResponseHolder submitRequest( final Duration delay; if (response != null && response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) { - String headerId = response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER); + String headerId = StringUtils.urlDecode( + response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER) + ); if (headerId != null && !headerId.equals(taskId)) { log.warn( "Expected worker to have taskId [%s] but has taskId [%s], will retry in [%d]s", diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java index 8109961558c3..af1f8222d1f5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/security/TaskResourceFilter.java @@ -64,7 +64,7 @@ public TaskResourceFilter( @Override public ContainerRequest filter(ContainerRequest request) { - final String taskId = Preconditions.checkNotNull( + String taskId = Preconditions.checkNotNull( request.getPathSegments() .get( Iterables.indexOf( @@ -80,6 +80,7 @@ public boolean apply(PathSegment input) ) + 1 ).getPath() ); + taskId = StringUtils.urlDecode(taskId); Optional taskOptional = taskStorageQueryAdapter.getTask(taskId); if (!taskOptional.isPresent()) { diff --git a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js index bda6094e63f1..db64b4f920b2 100644 --- a/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js +++ b/indexing-service/src/main/resources/indexer_static/js/console-0.0.1.js @@ -123,7 +123,7 @@ $(document).ready(function() { 'suspended' : 'running'; data[i] = { - "dataSource" : supervisorId, + "dataSource" : dataList[i].id, "more" : 'payload' + 'status' + diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index 2b46de7f06fd..e3f3155e5f11 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -16,10 +16,12 @@ # Base image is built from integration-tests/docker-base in the Druid repo FROM imply/druiditbase +RUN echo "[mysqld]\ncharacter-set-server=utf8\ncollation-server=utf8_bin\n" >> /etc/mysql/my.cnf + # Setup metadata store # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. RUN find /var/lib/mysql -type f -exec touch {} \; && /etc/init.d/mysql start \ - && echo "GRANT ALL ON druid.* TO 'druid'@'%' IDENTIFIED BY 'diurd'; CREATE database druid DEFAULT CHARACTER SET utf8;" | mysql -u root \ + && echo "CREATE USER 'druid'@'%' IDENTIFIED BY 'diurd'; GRANT ALL ON druid.* TO 'druid'@'%'; CREATE database druid DEFAULT CHARACTER SET utf8mb4;" | mysql -u root \ && /etc/init.d/mysql stop # Add Druid jars diff --git a/integration-tests/docker/middlemanager.conf b/integration-tests/docker/middlemanager.conf index 173829d135da..40adf19339b6 100644 --- a/integration-tests/docker/middlemanager.conf +++ b/integration-tests/docker/middlemanager.conf @@ -14,7 +14,7 @@ command=java -Ddruid.worker.capacity=3 -Ddruid.indexer.logs.directory=/shared/tasklogs -Ddruid.storage.storageDirectory=/shared/storage - -Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml" + -Ddruid.indexer.runner.javaOpts="-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Dlog4j.configurationFile=/shared/docker/lib/log4j2.xml" -Ddruid.indexer.fork.property.druid.processing.buffer.sizeBytes=25000000 -Ddruid.indexer.fork.property.druid.processing.numThreads=1 -Ddruid.indexer.fork.server.http.numThreads=100 diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 73390ab537f1..43c23e405a58 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -231,6 +231,7 @@ -Dfile.encoding=UTF-8 -Ddruid.test.config.dockerIp=${env.DOCKER_IP} -Ddruid.test.config.hadoopDir=${env.HADOOP_DIR} + -Ddruid.test.config.extraDatasourceNameSuffix=\ Россия\ 한국\ 中国!? -Ddruid.zk.service.host=${env.DOCKER_IP} -Ddruid.client.https.trustStorePath=client_tls/truststore.jks -Ddruid.client.https.trustStorePassword=druid123 diff --git a/integration-tests/run_cluster.sh b/integration-tests/run_cluster.sh index c08867ab59cb..49d046ac6b97 100755 --- a/integration-tests/run_cluster.sh +++ b/integration-tests/run_cluster.sh @@ -53,40 +53,44 @@ cp src/main/resources/log4j2.xml $SHARED_DIR/docker/lib/log4j2.xml # copy the integration test jar, it provides test-only extension implementations cp target/druid-integration-tests*.jar $SHARED_DIR/docker/lib +# one of the integration tests needs the wikiticker sample data +mkdir -p $SHARED_DIR/wikiticker-it +cp ../examples/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz $SHARED_DIR/wikiticker-it/wikiticker-2015-09-12-sampled.json.gz + docker network create --subnet=172.172.172.0/24 druid-it-net # Build Druid Cluster Image docker build -t druid/cluster $SHARED_DIR/docker # Start zookeeper and kafka -docker run -d --privileged --net druid-it-net --ip 172.172.172.2 --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.2 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -p 9093:9093 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster # Start MYSQL -docker run -d --privileged --net druid-it-net --ip 172.172.172.3 --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.3 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster # Start Overlord -docker run -d --privileged --net druid-it-net --ip 172.172.172.4 --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.4 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-overlord -p 8090:8090 -p 8290:8290 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Coordinator -docker run -d --privileged --net druid-it-net --ip 172.172.172.5 --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.5 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-coordinator -p 8081:8081 -p 8281:8281 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Historical -docker run -d --privileged --net druid-it-net --ip 172.172.172.6 --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.6 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-historical -p 8083:8083 -p 8283:8283 -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Middlemanger -docker run -d --privileged --net druid-it-net --ip 172.172.172.7 --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.7 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-middlemanager -p 8091:8091 -p 8291:8291 -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -p 8300:8300 -p 8301:8301 -p 8302:8302 -p 8303:8303 -p 8304:8304 -p 8305:8305 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster # Start Broker -docker run -d --privileged --net druid-it-net --ip 172.172.172.8 --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.8 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-broker -p 8082:8082 -p 8282:8282 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster # Start Router -docker run -d --privileged --net druid-it-net --ip 172.172.172.9 --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.9 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router -p 8888:8888 -p 9088:9088 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster # Start Router with permissive TLS settings (client auth enabled, no hostname verification, no revocation check) -docker run -d --privileged --net druid-it-net --ip 172.172.172.10 --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-permissive-tls.conf:$SUPERVISORDIR/router-permissive-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.10 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router-permissive-tls -p 8889:8889 -p 9089:9089 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-permissive-tls.conf:$SUPERVISORDIR/router-permissive-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster # Start Router with TLS but no client auth -docker run -d --privileged --net druid-it-net --ip 172.172.172.11 --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-no-client-auth-tls.conf:$SUPERVISORDIR/router-no-client-auth-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.11 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --name druid-router-no-client-auth-tls -p 8890:8890 -p 9090:9090 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-no-client-auth-tls.conf:$SUPERVISORDIR/router-no-client-auth-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster # Start Router with custom TLS cert checkers -docker run -d --privileged --net druid-it-net --ip 172.172.172.12 --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-custom-check-tls.conf:$SUPERVISORDIR/router-custom-check-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster +docker run -d --privileged --net druid-it-net --ip 172.172.172.12 -e LANG=C.UTF-8 -e LANGUAGE=C.UTF-8 -e LC_ALL=C.UTF-8 --hostname druid-router-custom-check-tls --name druid-router-custom-check-tls -p 8891:8891 -p 9091:9091 -v $SHARED_DIR:/shared -v $DOCKERDIR/router-custom-check-tls.conf:$SUPERVISORDIR/router-custom-check-tls.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster diff --git a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java index 990b91805768..976eb894f99f 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/ConfigFileConfigProvider.java @@ -348,6 +348,12 @@ public boolean manageKafkaTopic() { return Boolean.valueOf(props.getOrDefault("manageKafkaTopic", "true")); } + + @Override + public String getExtraDatasourceNameSuffix() + { + return ""; + } }; } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java index 04d512a15cdf..7cd8d9363b48 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/DockerConfigProvider.java @@ -37,6 +37,9 @@ public class DockerConfigProvider implements IntegrationTestingConfigProvider @NotNull private String hadoopDir; + @JsonProperty + private String extraDatasourceNameSuffix = ""; + @Override public IntegrationTestingConfig get() { @@ -202,6 +205,12 @@ public boolean manageKafkaTopic() { return true; } + + @Override + public String getExtraDatasourceNameSuffix() + { + return extraDatasourceNameSuffix; + } }; } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java index ec321a035f56..f4e745fca946 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/IntegrationTestingConfig.java @@ -82,4 +82,6 @@ default String getKafkaInternalHost() Map getProperties(); boolean manageKafkaTopic(); + + String getExtraDatasourceNameSuffix(); } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java index 63858d3eb35b..8c1b5d173a72 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/ClientInfoResourceTestClient.java @@ -72,7 +72,12 @@ public List getDimensions(String dataSource, String interval) StatusResponseHolder response = httpClient.go( new Request( HttpMethod.GET, - new URL(StringUtils.format("%s/%s/dimensions?interval=%s", getBrokerURL(), dataSource, interval)) + new URL(StringUtils.format( + "%s/%s/dimensions?interval=%s", + getBrokerURL(), + StringUtils.urlEncode(dataSource), + interval + )) ), responseHandler ).get(); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java index c3145aae824b..df6ad20d34d6 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/CoordinatorResourceTestClient.java @@ -72,12 +72,12 @@ private String getCoordinatorURL() private String getMetadataSegmentsURL(String dataSource) { - return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), dataSource); + return StringUtils.format("%smetadata/datasources/%s/segments", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); } private String getIntervalsURL(String dataSource) { - return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), dataSource); + return StringUtils.format("%sdatasources/%s/intervals", getCoordinatorURL(), StringUtils.urlEncode(dataSource)); } private String getLoadStatusURL() @@ -150,7 +150,7 @@ public boolean areSegmentsLoaded(String dataSource) public void unloadSegmentsForDataSource(String dataSource) { try { - makeRequest(HttpMethod.DELETE, StringUtils.format("%sdatasources/%s", getCoordinatorURL(), dataSource)); + makeRequest(HttpMethod.DELETE, StringUtils.format("%sdatasources/%s", getCoordinatorURL(), StringUtils.urlEncode(dataSource))); } catch (Exception e) { throw Throwables.propagate(e); @@ -165,7 +165,7 @@ public void deleteSegmentsDataSource(String dataSource, Interval interval) StringUtils.format( "%sdatasources/%s/intervals/%s", getCoordinatorURL(), - dataSource, + StringUtils.urlEncode(dataSource), interval.toString().replace('/', '_') ) ); diff --git a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java index 45f55d6f1657..5cee2fb28866 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/clients/OverlordResourceTestClient.java @@ -42,7 +42,6 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.net.URL; -import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; @@ -121,7 +120,7 @@ public TaskState getTaskStatus(String taskID) StringUtils.format( "%stask/%s/status", getIndexerURL(), - URLEncoder.encode(taskID, "UTF-8") + StringUtils.urlEncode(taskID) ) ); @@ -234,7 +233,7 @@ public void shutdownSupervisor(String id) { try { StatusResponseHolder response = httpClient.go( - new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), id))), + new Request(HttpMethod.POST, new URL(StringUtils.format("%ssupervisor/%s/shutdown", getIndexerURL(), StringUtils.urlEncode(id)))), responseHandler ).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java index 649557d043ab..888979d3dee8 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/TestQueryHelper.java @@ -45,6 +45,7 @@ public class TestQueryHelper private final String brokerTLS; private final String router; private final String routerTLS; + private final IntegrationTestingConfig config; @Inject TestQueryHelper( @@ -59,6 +60,7 @@ public class TestQueryHelper this.brokerTLS = config.getBrokerTLSUrl(); this.router = config.getRouterUrl(); this.routerTLS = config.getRouterTLSUrl(); + this.config = config; } public void testQueriesFromFile(String filePath, int timesToRun) throws Exception diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index f417c149d062..452bf9b2f346 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -20,6 +20,9 @@ package org.apache.druid.tests.indexer; import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.ClientInfoResourceTestClient; @@ -27,6 +30,7 @@ import org.junit.Assert; import java.io.IOException; +import java.io.InputStream; import java.util.List; public class AbstractITBatchIndexTest extends AbstractIndexerTest @@ -45,9 +49,31 @@ void doIndexTestTest( String queryFilePath ) throws IOException { - submitTaskAndWait(indexTaskFilePath, dataSource); + final String fullDatasourceName = dataSource + config.getExtraDatasourceNameSuffix(); + final String taskSpec = StringUtils.replace( + getTaskAsString(indexTaskFilePath), + "%%DATASOURCE%%", + fullDatasourceName + ); + + submitTaskAndWait(taskSpec, fullDatasourceName); try { - queryHelper.testQueriesFromFile(queryFilePath, 2); + + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + queryHelper.testQueriesFromString(queryResponseTemplate, 2); } catch (Exception e) { @@ -57,17 +83,48 @@ void doIndexTestTest( } void doReindexTest( + String baseDataSource, String reindexDataSource, String reindexTaskFilePath, String queryFilePath ) throws IOException { - submitTaskAndWait(reindexTaskFilePath, reindexDataSource); + final String fullBaseDatasourceName = baseDataSource + config.getExtraDatasourceNameSuffix(); + final String fullReindexDatasourceName = reindexDataSource + config.getExtraDatasourceNameSuffix(); + + String taskSpec = StringUtils.replace( + getTaskAsString(reindexTaskFilePath), + "%%DATASOURCE%%", + fullBaseDatasourceName + ); + + taskSpec = StringUtils.replace( + taskSpec, + "%%REINDEX_DATASOURCE%%", + fullReindexDatasourceName + ); + + submitTaskAndWait(taskSpec, fullReindexDatasourceName); try { - queryHelper.testQueriesFromFile(queryFilePath, 2); + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullBaseDatasourceName + ); + + queryHelper.testQueriesFromString(queryResponseTemplate, 2); // verify excluded dimension is not reIndexed final List dimensions = clientInfoResourceTestClient.getDimensions( - reindexDataSource, + fullReindexDatasourceName, "2013-08-31T00:00:00.000Z/2013-09-10T00:00:00.000Z" ); Assert.assertFalse("dimensions : " + dimensions, dimensions.contains("robot")); @@ -78,9 +135,9 @@ void doReindexTest( } } - private void submitTaskAndWait(String indexTaskFilePath, String dataSourceName) throws IOException + private void submitTaskAndWait(String taskSpec, String dataSourceName) throws IOException { - final String taskID = indexer.submitTask(getTaskAsString(indexTaskFilePath)); + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java index 176ea112590b..b972006ff27f 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java @@ -34,6 +34,7 @@ import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; +import org.testng.annotations.BeforeSuite; import java.io.Closeable; import java.io.InputStream; @@ -79,15 +80,26 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes @Inject IntegrationTestingConfig config; + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix(); + } + + void doTest() { LOG.info("Starting test: ITRealtimeIndexTaskTest"); - try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { + try (final Closeable closeable = unloader(fullDatasourceName)) { // the task will run for 3 minutes and then shutdown itself String task = setShutOffTime( getTaskAsString(getTaskResource()), DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3)) ); + task = StringUtils.replace(task, "%%DATASOURCE%%", fullDatasourceName); + LOG.info("indexerSpec: [%s]\n", task); taskID = indexer.submitTask(task); @@ -119,6 +131,7 @@ void doTest() queryStr = StringUtils.replace(queryStr, "%%POST_AG_REQUEST_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2))); String postAgResponseTimestamp = TIMESTAMP_FMT.print(dtGroupBy.withSecondOfMinute(0)); queryStr = StringUtils.replace(queryStr, "%%POST_AG_RESPONSE_TIMESTAMP%%", postAgResponseTimestamp); + queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName); // should hit the queries all on realtime task or some on realtime task // and some on historical. Which it is depends on where in the minute we were @@ -140,7 +153,7 @@ void doTest() @Override public Boolean call() { - return coordinator.areSegmentsLoaded(INDEX_DATASOURCE); + return coordinator.areSegmentsLoaded(fullDatasourceName); } }, true, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java index 7f7819265d57..b739d79ddddb 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractIndexerTest.java @@ -25,6 +25,7 @@ import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.clients.OverlordResourceTestClient; import org.apache.druid.testing.utils.RetryUtil; @@ -54,6 +55,9 @@ public abstract class AbstractIndexerTest @Inject protected TestQueryHelper queryHelper; + @Inject + private IntegrationTestingConfig config; + protected Closeable unloader(final String dataSource) { return () -> unloadAndKillData(dataSource); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java index 5ae32fa2b37d..db6ebff48abb 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITCompactionTaskTest.java @@ -19,15 +19,21 @@ package org.apache.druid.tests.indexer; +import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.RetryUtil; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; import java.util.List; @Guice(moduleFactory = DruidTestModuleFactory.class) @@ -39,23 +45,49 @@ public class ITCompactionTaskTest extends AbstractIndexerTest private static String INDEX_DATASOURCE = "wikipedia_index_test"; private static String COMPACTION_TASK = "/indexer/wikipedia_compaction_task.json"; + @Inject + private IntegrationTestingConfig config; + + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix(); + } + @Test public void testCompactionWithoutKeepSegmentGranularity() throws Exception { loadData(); - final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsBeforeCompaction.sort(null); final String compactedInterval = "2013-08-31T00:00:00.000Z/2013-09-02T00:00:00.000Z"; if (intervalsBeforeCompaction.contains(compactedInterval)) { throw new ISE("Containing a segment for the compacted interval[%s] before compaction", compactedInterval); } - try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { - queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + try (final Closeable closeable = unloader(fullDatasourceName)) { + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + + queryHelper.testQueriesFromString(queryResponseTemplate, 2); compactData(false); // 4 segments across 2 days, compacted into 1 new segment (5 total) checkCompactionFinished(5); - queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + queryHelper.testQueriesFromString(queryResponseTemplate, 2); intervalsBeforeCompaction.add(compactedInterval); intervalsBeforeCompaction.sort(null); @@ -67,15 +99,31 @@ public void testCompactionWithoutKeepSegmentGranularity() throws Exception public void testCompactionWithKeepSegmentGranularity() throws Exception { loadData(); - final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); + final List intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsBeforeCompaction.sort(null); - try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { - queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + try (final Closeable closeable = unloader(fullDatasourceName)) { + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(INDEX_QUERIES_RESOURCE); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", INDEX_QUERIES_RESOURCE); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + + + queryHelper.testQueriesFromString(queryResponseTemplate, 2); compactData(true); // 4 segments across 2 days, compacted into 2 new segments (6 total) checkCompactionFinished(6); - queryHelper.testQueriesFromFile(INDEX_QUERIES_RESOURCE, 2); + queryHelper.testQueriesFromString(queryResponseTemplate, 2); checkCompactionIntervals(intervalsBeforeCompaction); } @@ -83,12 +131,14 @@ public void testCompactionWithKeepSegmentGranularity() throws Exception private void loadData() throws Exception { - final String taskID = indexer.submitTask(getTaskAsString(INDEX_TASK)); + String taskSpec = getTaskAsString(INDEX_TASK); + taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); RetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE), + () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Load" ); } @@ -96,14 +146,16 @@ private void loadData() throws Exception private void compactData(boolean keepSegmentGranularity) throws Exception { final String template = getTaskAsString(COMPACTION_TASK); - final String taskSpec = + String taskSpec = StringUtils.replace(template, "${KEEP_SEGMENT_GRANULARITY}", Boolean.toString(keepSegmentGranularity)); + taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for compaction task %s", taskID); indexer.waitUntilTaskCompletes(taskID); RetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(INDEX_DATASOURCE), + () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Compaction" ); } @@ -112,7 +164,7 @@ private void checkCompactionFinished(int numExpectedSegments) { RetryUtil.retryUntilTrue( () -> { - int metadataSegmentCount = coordinator.getMetadataSegments(INDEX_DATASOURCE).size(); + int metadataSegmentCount = coordinator.getMetadataSegments(fullDatasourceName).size(); LOG.info("Current metadata segment count: %d, expected: %d", metadataSegmentCount, numExpectedSegments); return metadataSegmentCount == numExpectedSegments; }, @@ -124,7 +176,7 @@ private void checkCompactionIntervals(List expectedIntervals) { RetryUtil.retryUntilTrue( () -> { - final List intervalsAfterCompaction = coordinator.getSegmentIntervals(INDEX_DATASOURCE); + final List intervalsAfterCompaction = coordinator.getSegmentIntervals(fullDatasourceName); intervalsAfterCompaction.sort(null); System.out.println("AFTER: " + intervalsAfterCompaction); System.out.println("EXPECTED: " + expectedIntervals); diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index 63681e207b66..8412e49c17f4 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -31,6 +31,7 @@ public class ITIndexerTest extends AbstractITBatchIndexTest private static String INDEX_TASK = "/indexer/wikipedia_index_task.json"; private static String INDEX_QUERIES_RESOURCE = "/indexer/wikipedia_index_queries.json"; private static String INDEX_DATASOURCE = "wikipedia_index_test"; + private static String REINDEX_TASK = "/indexer/wikipedia_reindex_task.json"; private static String REINDEX_DATASOURCE = "wikipedia_reindex_test"; @@ -38,8 +39,8 @@ public class ITIndexerTest extends AbstractITBatchIndexTest public void testIndexData() throws Exception { try ( - final Closeable indexCloseable = unloader(INDEX_DATASOURCE); - final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE) + final Closeable indexCloseable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable reindexCloseable = unloader(REINDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); ) { doIndexTestTest( INDEX_DATASOURCE, @@ -47,6 +48,7 @@ public void testIndexData() throws Exception INDEX_QUERIES_RESOURCE ); doReindexTest( + INDEX_DATASOURCE, REINDEX_DATASOURCE, REINDEX_TASK, INDEX_QUERIES_RESOURCE diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java index 3306da96b338..247fd7e55b2d 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaIndexingServiceTest.java @@ -44,6 +44,7 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -64,6 +65,7 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json"; private static final String DATASOURCE = "kafka_indexing_service_test"; private static final String TOPIC_NAME = "kafka_indexing_service_topic"; + private static final int NUM_EVENTS_TO_SEND = 60; private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L; public static final String testPropertyPrefix = "kafka.test.property."; @@ -105,6 +107,14 @@ public class ITKafkaIndexingServiceTest extends AbstractIndexerTest @Inject private IntegrationTestingConfig config; + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix(); + } + @Test public void testKafka() { @@ -143,7 +153,7 @@ public void testKafka() addFilteredProperties(consumerProperties); spec = getTaskAsString(INDEXER_FILE); - spec = StringUtils.replace(spec, "%%DATASOURCE%%", DATASOURCE); + spec = StringUtils.replace(spec, "%%DATASOURCE%%", fullDatasourceName); spec = StringUtils.replace(spec, "%%TOPIC%%", TOPIC_NAME); spec = StringUtils.replace(spec, "%%CONSUMER_PROPERTIES%%", jsonMapper.writeValueAsString(consumerProperties)); LOG.info("supervisorSpec: [%s]\n", spec); @@ -228,7 +238,7 @@ public void testKafka() } String queryStr = query_response_template; - queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", DATASOURCE); + queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName); queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)); queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)); @@ -271,7 +281,7 @@ public Boolean call() @Override public Boolean call() { - return coordinator.areSegmentsLoaded(DATASOURCE); + return coordinator.areSegmentsLoaded(fullDatasourceName); } }, true, @@ -306,7 +316,7 @@ public void afterClass() // remove segments if (segmentsExist) { - unloadAndKillData(DATASOURCE); + unloadAndKillData(fullDatasourceName); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java index 3388efe28670..af7e82902f51 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITKafkaTest.java @@ -44,6 +44,7 @@ import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; @@ -106,6 +107,13 @@ public class ITKafkaTest extends AbstractIndexerTest @Inject private IntegrationTestingConfig config; + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = DATASOURCE + config.getExtraDatasourceNameSuffix(); + } @Test public void testKafka() { @@ -204,7 +212,7 @@ public void testKafka() addFilteredProperties(consumerProperties); indexerSpec = getTaskAsString(INDEXER_FILE); - indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", DATASOURCE); + indexerSpec = StringUtils.replace(indexerSpec, "%%DATASOURCE%%", fullDatasourceName); indexerSpec = StringUtils.replace(indexerSpec, "%%TOPIC%%", TOPIC_NAME); indexerSpec = StringUtils.replace(indexerSpec, "%%COUNT%%", Integer.toString(num_events)); String consumerPropertiesJson = jsonMapper.writeValueAsString(consumerProperties); @@ -233,7 +241,7 @@ public void testKafka() @Override public Boolean call() { - return coordinator.areSegmentsLoaded(DATASOURCE); + return coordinator.areSegmentsLoaded(fullDatasourceName); } }, true, @@ -263,7 +271,7 @@ public Boolean call() } String queryStr = queryResponseTemplate; - queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", DATASOURCE); + queryStr = StringUtils.replace(queryStr, "%%DATASOURCE%%", fullDatasourceName); // time boundary queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)); queryStr = StringUtils.replace(queryStr, "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)); @@ -296,7 +304,7 @@ public void afterClass() // remove segments if (segmentsExist) { - unloadAndKillData(DATASOURCE); + unloadAndKillData(fullDatasourceName); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java index 90ba30419c2b..350e2ab34411 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITNestedQueryPushDownTest.java @@ -21,6 +21,9 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.ClientInfoResourceTestClient; @@ -28,9 +31,13 @@ import org.apache.druid.testing.guice.DruidTestModuleFactory; import org.apache.druid.testing.utils.RetryUtil; import org.apache.druid.testing.utils.TestQueryHelper; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; +import java.io.IOException; +import java.io.InputStream; + @Guice(moduleFactory = DruidTestModuleFactory.class) public class ITNestedQueryPushDownTest extends AbstractIndexerTest { @@ -51,12 +58,36 @@ public class ITNestedQueryPushDownTest extends AbstractIndexerTest @Inject ClientInfoResourceTestClient clientInfoResourceTestClient; + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = WIKITICKER_DATA_SOURCE + config.getExtraDatasourceNameSuffix(); + } + @Test public void testIndexData() { try { loadData(); - queryHelper.testQueriesFromFile(WIKITICKER_QUERIES_RESOURCE, 2); + + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(WIKITICKER_QUERIES_RESOURCE); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", WIKITICKER_QUERIES_RESOURCE); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + + queryHelper.testQueriesFromString(queryResponseTemplate, 2); } catch (Exception e) { LOG.error(e, "Error while testing"); @@ -66,11 +97,13 @@ public void testIndexData() private void loadData() throws Exception { - final String taskID = indexer.submitTask(getTaskAsString(WIKITICKER_INDEX_TASK)); + String taskSpec = getTaskAsString(WIKITICKER_INDEX_TASK); + taskSpec = StringUtils.replace(taskSpec, "%%DATASOURCE%%", fullDatasourceName); + final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); indexer.waitUntilTaskCompletes(taskID); RetryUtil.retryUntilTrue( - () -> coordinator.areSegmentsLoaded(WIKITICKER_DATA_SOURCE), "Segment Load" + () -> coordinator.areSegmentsLoaded(fullDatasourceName), "Segment Load" ); } } diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java index b844acd3e681..80ca6e104873 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITParallelIndexTest.java @@ -35,7 +35,7 @@ public class ITParallelIndexTest extends AbstractITBatchIndexTest @Test public void testIndexData() throws Exception { - try (final Closeable closeable = unloader(INDEX_DATASOURCE)) { + try (final Closeable closeable = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix())) { doIndexTestTest( INDEX_DATASOURCE, INDEX_TASK, diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java index 38adae8063d6..8b65c40db3d5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITUnionQueryTest.java @@ -20,9 +20,11 @@ package org.apache.druid.tests.indexer; import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; import org.apache.druid.curator.discovery.ServerDiscoveryFactory; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; @@ -39,10 +41,12 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; +import org.testng.annotations.BeforeSuite; import org.testng.annotations.Guice; import org.testng.annotations.Test; import java.io.IOException; +import java.io.InputStream; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -70,13 +74,21 @@ public class ITUnionQueryTest extends AbstractIndexerTest @Inject IntegrationTestingConfig config; + private String fullDatasourceName; + + @BeforeSuite + public void setFullDatasourceName() + { + fullDatasourceName = UNION_DATASOURCE + config.getExtraDatasourceNameSuffix(); + } + @Test public void testUnionQuery() throws IOException { final int numTasks = 3; final Closer closer = Closer.create(); for (int i = 0; i < numTasks; i++) { - closer.register(unloader(UNION_DATASOURCE + i)); + closer.register(unloader(fullDatasourceName + i)); } try { // Load 4 datasources with same dimensions @@ -89,7 +101,7 @@ public void testUnionQuery() throws IOException taskIDs.add( indexer.submitTask( withServiceName( - withDataSource(task, UNION_DATASOURCE + i), + withDataSource(task, fullDatasourceName + i), EVENT_RECEIVER_SERVICE_PREFIX + i ) ) @@ -103,9 +115,9 @@ public void testUnionQuery() throws IOException RetryUtil.retryUntil( () -> { for (int i = 0; i < numTasks; i++) { - final int countRows = queryHelper.countRows(UNION_DATASOURCE + i, "2013-08-31/2013-09-01"); + final int countRows = queryHelper.countRows(fullDatasourceName + i, "2013-08-31/2013-09-01"); if (countRows < 5) { - LOG.warn("%d events have been ingested to %s so far", countRows, UNION_DATASOURCE + i); + LOG.warn("%d events have been ingested to %s so far", countRows, fullDatasourceName + i); return false; } } @@ -119,7 +131,23 @@ public void testUnionQuery() throws IOException // should hit the queries on realtime task LOG.info("Running Union Queries.."); - this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2); + + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(UNION_QUERIES_RESOURCE); + queryResponseTemplate = IOUtils.toString(is, "UTF-8"); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", UNION_QUERIES_RESOURCE); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + fullDatasourceName + ); + + this.queryHelper.testQueriesFromString(queryResponseTemplate, 2); // wait for the task to complete for (int i = 0; i < numTasks; i++) { @@ -134,7 +162,7 @@ public void testUnionQuery() throws IOException @Override public Boolean call() { - return coordinator.areSegmentsLoaded(UNION_DATASOURCE + taskNum); + return coordinator.areSegmentsLoaded(fullDatasourceName + taskNum); } }, true, @@ -144,7 +172,7 @@ public Boolean call() ); } // run queries on historical nodes - this.queryHelper.testQueriesFromFile(UNION_QUERIES_RESOURCE, 2); + this.queryHelper.testQueriesFromString(queryResponseTemplate, 2); } catch (Throwable e) { @@ -162,7 +190,7 @@ private String setShutOffTime(String taskAsString, DateTime time) private String withDataSource(String taskAsString, String dataSource) { - return StringUtils.replace(taskAsString, UNION_DATASOURCE, dataSource); + return StringUtils.replace(taskAsString, "%%DATASOURCE%%", dataSource); } private String withServiceName(String taskAsString, String serviceName) diff --git a/integration-tests/src/test/resources/indexer/union_queries.json b/integration-tests/src/test/resources/indexer/union_queries.json index fa63e8404d49..627af04edc9a 100644 --- a/integration-tests/src/test/resources/indexer/union_queries.json +++ b/integration-tests/src/test/resources/indexer/union_queries.json @@ -6,8 +6,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -69,8 +69,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -149,8 +149,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -263,8 +263,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -344,8 +344,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -417,8 +417,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "intervals": ["2013-08-31/2013-09-01"], @@ -508,8 +508,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] }, "granularity": "all", @@ -548,8 +548,8 @@ "dataSource": { "type": "union", "dataSources": [ - "wikipedia_index_test1", "wikipedia_index_test2", "wikipedia_index_test3", - "wikipedia_index_test0" + "%%DATASOURCE%%1", "%%DATASOURCE%%2", "%%DATASOURCE%%3", + "%%DATASOURCE%%0" ] } }, diff --git a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json index 3fdad69ff5df..1f7cb4486c9b 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_compaction_task.json @@ -1,6 +1,6 @@ { "type" : "compact", - "dataSource" : "wikipedia_index_test", + "dataSource" : "%%DATASOURCE%%", "interval" : "2013-08-31/2013-09-02", "keepSegmentGranularity" : ${KEEP_SEGMENT_GRANULARITY} } \ No newline at end of file diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json index 04565bd83866..9618ba9e9b6d 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_queries.json @@ -3,7 +3,7 @@ "description": "timeseries, 1 agg, all", "query":{ "queryType" : "timeBoundary", - "dataSource": "wikipedia_index_test" + "dataSource": "%%DATASOURCE%%" }, "expectedResults":[ { @@ -20,7 +20,7 @@ "description":"having spec on post aggregation", "query":{ "queryType":"groupBy", - "dataSource":"wikipedia_index_test", + "dataSource":"%%DATASOURCE%%", "granularity":"day", "dimensions":[ "page" diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json index 8b3eab89fb23..a9ae6e2c4670 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_task.json @@ -2,7 +2,7 @@ "type": "index", "spec": { "dataSchema": { - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json index 76ecb5cd1a36..9618ba9e9b6d 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_queries.json @@ -3,7 +3,7 @@ "description": "timeseries, 1 agg, all", "query":{ "queryType" : "timeBoundary", - "dataSource": "wikipedia_parallel_index_test" + "dataSource": "%%DATASOURCE%%" }, "expectedResults":[ { @@ -20,7 +20,7 @@ "description":"having spec on post aggregation", "query":{ "queryType":"groupBy", - "dataSource":"wikipedia_parallel_index_test", + "dataSource":"%%DATASOURCE%%", "granularity":"day", "dimensions":[ "page" diff --git a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json index 911adbd9f277..f317c538f6b6 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_parallel_index_task.json @@ -2,7 +2,7 @@ "type": "index_parallel", "spec": { "dataSchema": { - "dataSource": "wikipedia_parallel_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json index acd88ca893e0..46d5ec4395ac 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_queries.json @@ -3,7 +3,7 @@ "description": "timeBoundary", "query": { "queryType":"timeBoundary", - "dataSource":"wikipedia_index_test" + "dataSource":"%%DATASOURCE%%" }, "expectedResults":[ { @@ -19,7 +19,7 @@ "description": "timeseries", "query": { "queryType": "timeseries", - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ], "granularity": "all", "aggregations": [ @@ -41,7 +41,7 @@ "description":"having spec on post aggregation", "query":{ "queryType":"groupBy", - "dataSource":"wikipedia_index_test", + "dataSource":"%%DATASOURCE%%", "granularity":"minute", "dimensions":[ "page" diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json index 765914b62adc..9e773609cb70 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_appenderator_index_task.json @@ -2,7 +2,7 @@ "type": "index_realtime_appenderator", "spec": { "dataSchema": { - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json index b579b347c348..bb6759595e0f 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_queries.json @@ -3,7 +3,7 @@ "description": "timeBoundary", "query": { "queryType":"timeBoundary", - "dataSource":"wikipedia_index_test" + "dataSource":"%%DATASOURCE%%" }, "expectedResults":[ { @@ -19,7 +19,7 @@ "description": "timeseries", "query": { "queryType": "timeseries", - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ], "granularity": "all", "aggregations": [ @@ -41,7 +41,7 @@ "description":"having spec on post aggregation", "query":{ "queryType":"groupBy", - "dataSource":"wikipedia_index_test", + "dataSource":"%%DATASOURCE%%", "granularity":"minute", "dimensions":[ "page" diff --git a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json index ecfff579716b..5f48162c488c 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_realtime_index_task.json @@ -2,7 +2,7 @@ "type": "index_realtime", "spec": { "dataSchema": { - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json b/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json index b63f9f184cc2..e277a9127f49 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_reindex_task.json @@ -2,7 +2,7 @@ "type": "index", "spec": { "dataSchema": { - "dataSource": "wikipedia_reindex_test", + "dataSource": "%%REINDEX_DATASOURCE%%", "metricsSpec": [ { "type": "doubleSum", @@ -42,7 +42,7 @@ "type": "index", "firehose": { "type": "ingestSegment", - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "interval": "2013-08-31/2013-09-01" } }, diff --git a/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json b/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json index 09af36efe28c..75c1281fcd2d 100644 --- a/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikipedia_union_index_task.json @@ -2,7 +2,7 @@ "type": "index_realtime", "spec": { "dataSchema": { - "dataSource": "wikipedia_index_test", + "dataSource": "%%DATASOURCE%%", "metricsSpec": [ { "type": "count", diff --git a/integration-tests/src/test/resources/indexer/wikiticker_index_task.json b/integration-tests/src/test/resources/indexer/wikiticker_index_task.json index 5af8ae8cc24e..d450c7b9458b 100644 --- a/integration-tests/src/test/resources/indexer/wikiticker_index_task.json +++ b/integration-tests/src/test/resources/indexer/wikiticker_index_task.json @@ -2,7 +2,7 @@ "type": "index", "spec": { "dataSchema": { - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "granularitySpec": { "type": "uniform", "segmentGranularity": "day", @@ -18,20 +18,7 @@ "dimensionsSpec": { "dimensions": [ "channel", - "cityName", - "comment", - "countryIsoCode", - "countryName", - "isAnonymous", - "isMinor", - "isNew", - "isRobot", - "isUnpatrolled", - "metroCode", - "namespace", "page", - "regionIsoCode", - "regionName", "user" ] }, @@ -60,11 +47,6 @@ "name": "delta", "type": "longSum", "fieldName": "delta" - }, - { - "name": "user_unique", - "type": "hyperUnique", - "fieldName": "user" } ] }, @@ -72,7 +54,7 @@ "type": "index", "firehose": { "type": "local", - "baseDir": "/examples/quickstart/tutorial", + "baseDir": "/shared/wikiticker-it", "filter": "wikiticker-2015-09-12-sampled.json.gz" } }, diff --git a/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json b/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json index 4c0350c9585d..c7a062c02bbf 100644 --- a/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json +++ b/integration-tests/src/test/resources/queries/nestedquerypushdown_queries.json @@ -7,7 +7,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], @@ -60,7 +60,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], @@ -113,7 +113,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], @@ -191,7 +191,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], @@ -253,7 +253,7 @@ "type": "query", "query": { "queryType": "groupBy", - "dataSource": "wikiticker", + "dataSource": "%%DATASOURCE%%", "intervals": [ "2015-09-12/2015-09-13" ], diff --git a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java index 4f59d87a8490..d259271db70c 100644 --- a/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java +++ b/server/src/main/java/org/apache/druid/client/coordinator/CoordinatorClient.java @@ -57,7 +57,7 @@ public List fetchServerView(String dataSource, Interva druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format( "/druid/coordinator/v1/datasources/%s/intervals/%s/serverview?partial=%s", - dataSource, + StringUtils.urlEncode(dataSource), interval.toString().replace('/', '_'), incompleteOk )) diff --git a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java index 09a4753ea3c0..5b6bc9a455c5 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java +++ b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java @@ -154,7 +154,10 @@ public String killTask(String taskId) final FullResponseHolder response = druidLeaderClient.go( druidLeaderClient.makeRequest( HttpMethod.POST, - StringUtils.format("/druid/indexer/v1/task/%s/shutdown", taskId) + StringUtils.format( + "/druid/indexer/v1/task/%s/shutdown", + StringUtils.urlEncode(taskId) + ) ) ); @@ -255,7 +258,10 @@ public TaskStatusResponse getTaskStatus(String taskId) { try { final FullResponseHolder responseHolder = druidLeaderClient.go( - druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format("/druid/indexer/v1/task/%s/status", taskId)) + druidLeaderClient.makeRequest(HttpMethod.GET, StringUtils.format( + "/druid/indexer/v1/task/%s/status", + StringUtils.urlEncode(taskId) + )) ); return jsonMapper.readValue( @@ -283,7 +289,7 @@ public int killPendingSegments(String dataSource, DateTime end) { final String endPoint = StringUtils.format( "/druid/indexer/v1/pendingSegments/%s?interval=%s", - dataSource, + StringUtils.urlEncode(dataSource), new Interval(DateTimes.MIN, end) ); try { diff --git a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java index 693b302b233a..9e64731ec9c0 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/firehose/ChatHandlerResource.java @@ -21,6 +21,7 @@ import com.google.common.base.Optional; import com.google.inject.Inject; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.server.metrics.DataSourceTaskIdHolder; import javax.ws.rs.Path; @@ -49,7 +50,7 @@ public Object doTaskChat(@PathParam("id") String handlerId, @Context HttpHeaders { if (taskId != null) { List requestTaskId = headers.getRequestHeader(TASK_ID_HEADER); - if (requestTaskId != null && !requestTaskId.contains(taskId)) { + if (requestTaskId != null && !requestTaskId.contains(StringUtils.urlEncode(taskId))) { return null; } } diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java index 6302a39f07eb..7dcd3b2c237a 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/TaskIdResponseHeaderFilterHolder.java @@ -20,6 +20,7 @@ package org.apache.druid.server.initialization.jetty; import com.google.common.collect.ImmutableMap; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.realtime.firehose.ChatHandlerResource; public class TaskIdResponseHeaderFilterHolder extends ResponseHeaderFilterHolder @@ -29,7 +30,7 @@ public TaskIdResponseHeaderFilterHolder(String path, String taskId) super(path, taskId == null ? ImmutableMap.of() - : ImmutableMap.of(ChatHandlerResource.TASK_ID_HEADER, taskId) + : ImmutableMap.of(ChatHandlerResource.TASK_ID_HEADER, StringUtils.urlEncode(taskId)) ); } } From 6e45599b826af5a051b2e4f5752690dadd1cf894 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 19 Dec 2018 17:54:59 -0800 Subject: [PATCH 2/6] Some unit test fixes --- .../org/apache/druid/java/util/common/StringUtils.java | 10 ++++++++++ .../druid/indexing/overlord/TaskRunnerUtilsTest.java | 4 ++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index fe633860ee9c..b15943b9ffe9 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -154,8 +154,13 @@ public static String toUpperCase(String s) return s.toUpperCase(Locale.ENGLISH); } + @Nullable public static String urlEncode(String s) { + if (s == null) { + return null; + } + try { // application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form // data as well, so replace "+" with "%20". @@ -166,8 +171,13 @@ public static String urlEncode(String s) } } + @Nullable public static String urlDecode(String s) { + if (s == null) { + return null; + } + try { return URLDecoder.decode(s, "UTF-8"); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java index 7c02946145e0..b21a98fda4ce 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskRunnerUtilsTest.java @@ -35,8 +35,8 @@ public void testMakeWorkerURL() "/druid/worker/v1/task/%s/log", "foo bar&" ); - Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo+bar%26/log", url.toString()); + Assert.assertEquals("https://1.2.3.4:8290/druid/worker/v1/task/foo%20bar%26/log", url.toString()); Assert.assertEquals("1.2.3.4:8290", url.getAuthority()); - Assert.assertEquals("/druid/worker/v1/task/foo+bar%26/log", url.getPath()); + Assert.assertEquals("/druid/worker/v1/task/foo%20bar%26/log", url.getPath()); } } From 6f0f16a091fe59ccf3383625ba9770449be1a77e Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 19 Dec 2018 21:53:55 -0800 Subject: [PATCH 3/6] Fix ITRealtimeIndexTaskTest --- .../tests/indexer/AbstractITRealtimeIndexTaskTest.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java index b972006ff27f..19288edf3a50 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java @@ -82,15 +82,10 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes private String fullDatasourceName; - @BeforeSuite - public void setFullDatasourceName() + void doTest() { fullDatasourceName = INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix(); - } - - void doTest() - { LOG.info("Starting test: ITRealtimeIndexTaskTest"); try (final Closeable closeable = unloader(fullDatasourceName)) { // the task will run for 3 minutes and then shutdown itself From be392280fe9e0a2d54dd137abaae9935eca423f2 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 20 Dec 2018 12:40:40 -0800 Subject: [PATCH 4/6] Checkstyle --- .../druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java index 19288edf3a50..cdf61de8c3ba 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITRealtimeIndexTaskTest.java @@ -34,7 +34,6 @@ import org.joda.time.DateTime; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; -import org.testng.annotations.BeforeSuite; import java.io.Closeable; import java.io.InputStream; From 0877e4ac1b8aa19e48ea78506f10e30e5b8d1f63 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 20 Dec 2018 14:33:20 -0800 Subject: [PATCH 5/6] TeamCity --- .../apache/druid/tests/indexer/AbstractITBatchIndexTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java index 452bf9b2f346..b9bac8b245c5 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java @@ -135,7 +135,7 @@ void doReindexTest( } } - private void submitTaskAndWait(String taskSpec, String dataSourceName) throws IOException + private void submitTaskAndWait(String taskSpec, String dataSourceName) { final String taskID = indexer.submitTask(taskSpec); LOG.info("TaskID for loading index task %s", taskID); From 830b6c17d3cb6ac56ab8112febd5cfb3c716da48 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 2 Jan 2019 19:21:08 -0800 Subject: [PATCH 6/6] PR comments --- .../apache/druid/java/util/common/StringUtils.java | 11 +++++++++-- .../druid/java/util/common/StringUtilsTest.java | 2 ++ 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index b15943b9ffe9..2c72e0e17253 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -154,6 +154,15 @@ public static String toUpperCase(String s) return s.toUpperCase(Locale.ENGLISH); } + /** + * Encodes a String in application/x-www-form-urlencoded format, with one exception: + * "+" in the encoded form is replaced with "%20". + * + * application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form data as well. + * + * @param s String to be encoded + * @return application/x-www-form-urlencoded format encoded String, but with "+" replaced with "%20". + */ @Nullable public static String urlEncode(String s) { @@ -162,8 +171,6 @@ public static String urlEncode(String s) } try { - // application/x-www-form-urlencoded encodes spaces as "+", but we use this to encode non-form - // data as well, so replace "+" with "%20". return StringUtils.replace(URLEncoder.encode(s, "UTF-8"), "+", "%20"); } catch (UnsupportedEncodingException e) { diff --git a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java index bd9a95f5aced..69d209a7b69b 100644 --- a/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java +++ b/core/src/test/java/org/apache/druid/java/util/common/StringUtilsTest.java @@ -154,8 +154,10 @@ public void testURLEncodeSpace() { String s1 = StringUtils.urlEncode("aaa bbb"); Assert.assertEquals(s1, "aaa%20bbb"); + Assert.assertEquals("aaa bbb", StringUtils.urlDecode(s1)); String s2 = StringUtils.urlEncode("fff+ggg"); Assert.assertEquals(s2, "fff%2Bggg"); + Assert.assertEquals("fff+ggg", StringUtils.urlDecode(s2)); } }