From 216c664a954b439a921f48749b16dd6d934121ae Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 9 Apr 2021 16:22:10 -0700 Subject: [PATCH 1/4] unit tests for timeout exception in init --- .../druid/client/JsonParserIteratorTest.java | 111 +++++++++++++++++- 1 file changed, 110 insertions(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java b/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java index e47693ec3c48..f85353c4de89 100644 --- a/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java +++ b/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java @@ -23,8 +23,12 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.Futures; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.query.Query; import org.apache.druid.query.QueryCapacityExceededException; import org.apache.druid.query.QueryException; import org.apache.druid.query.QueryInterruptedException; @@ -44,6 +48,9 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; @RunWith(Enclosed.class) public class JsonParserIteratorTest @@ -188,7 +195,7 @@ public static class QueryInterruptedExceptionConversionTest public ExpectedException expectedException = ExpectedException.none(); @Test - public void testConvertQueryExceptionToQueryInterruptedException() throws JsonProcessingException + public void testConvertQueryExceptionWithNullErrorCodeToQueryInterruptedException() throws JsonProcessingException { JsonParserIterator iterator = new JsonParserIterator<>( JAVA_TYPE, @@ -202,6 +209,108 @@ public void testConvertQueryExceptionToQueryInterruptedException() throws JsonPr expectedException.expectMessage("query exception test"); iterator.hasNext(); } + + @Test + public void testConvertQueryExceptionWithNonNullErrorCodeToQueryInterruptedException() + throws JsonProcessingException + { + JsonParserIterator iterator = new JsonParserIterator<>( + JAVA_TYPE, + Futures.immediateFuture( + mockErrorResponse(new QueryException("test error", "query exception test", null, null)) + ), + URL, + null, + HOST, + OBJECT_MAPPER + ); + expectedException.expect(QueryInterruptedException.class); + expectedException.expectMessage("query exception test"); + iterator.hasNext(); + } + } + + public static class TimeoutExceptionConversionTest + { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testTimeoutBeforeCallingFuture() throws JsonProcessingException + { + JsonParserIterator iterator = new JsonParserIterator<>( + JAVA_TYPE, + Mockito.mock(Future.class), + URL, + mockQuery("qid", 0L), // should always timeout + HOST, + OBJECT_MAPPER + ); + expectedException.expect(QueryTimeoutException.class); + expectedException.expectMessage(StringUtils.format("url[%s] timed out", URL)); + iterator.hasNext(); + } + + @Test + public void testTimeoutWhileCallingFuture() + { + Future future = new AbstractFuture() + { + @Override + public InputStream get(long timeout, TimeUnit unit) + throws InterruptedException + { + Thread.sleep(2000); // Sleep longer than timeout + return null; // should return null so that JsonParserIterator checks timeout + } + }; + JsonParserIterator iterator = new JsonParserIterator<>( + JAVA_TYPE, + future, + URL, + mockQuery("qid", System.currentTimeMillis() + 500L), // timeout in 500 ms + HOST, + OBJECT_MAPPER + ); + expectedException.expect(QueryTimeoutException.class); + expectedException.expectMessage(StringUtils.format("url[%s] timed out", URL)); + iterator.hasNext(); + } + + @Test + public void testTimeoutAfterCallingFuture() + { + ExecutorService service = Execs.singleThreaded("timeout-test"); + try { + JsonParserIterator iterator = new JsonParserIterator<>( + JAVA_TYPE, + service.submit(() -> { + Thread.sleep(2000); // Sleep longer than timeout + return null; + }), + URL, + mockQuery("qid", System.currentTimeMillis() + 500L), // timeout in 500 ms + HOST, + OBJECT_MAPPER + ); + expectedException.expect(QueryTimeoutException.class); + expectedException.expectMessage("Query [qid] timed out"); + iterator.hasNext(); + + } + finally { + service.shutdownNow(); + } + } + + private Query mockQuery(String queryId, long timeoutAt) + { + Query query = Mockito.mock(Query.class); + Mockito.when(query.getId()).thenReturn(queryId); + Mockito.when(query.getContextValue(ArgumentMatchers.eq(DirectDruidClient.QUERY_FAIL_TIME), ArgumentMatchers.eq(-1L))) + .thenReturn(timeoutAt); + return query; + } } private static InputStream mockErrorResponse(Exception e) throws JsonProcessingException From 568d36d246455ecb5e8472bdaf3f71043d40398e Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 9 Apr 2021 23:04:16 -0700 Subject: [PATCH 2/4] integration tests --- .../docker-compose.query-error-test.yml | 101 ++++++++ .../docker-compose.query-retry-test.yml | 2 +- integration-tests/docker/druid.sh | 6 +- ...y-test => historical-for-query-error-test} | 8 +- .../test-data/query-error-sample-data.sql | 20 ++ .../script/docker_compose_args.sh | 7 +- ...va => CliHistoricalForQueryErrorTest.java} | 16 +- .../cli/QueryRetryTestCommandCreator.java | 2 +- .../ServerManagerForQueryErrorTest.java | 238 ++++++++++++++++++ .../ServerManagerForQueryRetryTest.java | 144 ----------- .../org/apache/druid/tests/TestNGGroup.java | 2 + .../druid/tests/query/ITQueryErrorTest.java | 220 ++++++++++++++++ .../ITQueryRetryTestOnMissingSegments.java | 6 +- ...ive_query_error_from_historicals_test.json | 19 ++ .../sql_error_from_historicals_test.json | 9 + .../queries/sql_plan_failure_query.json | 8 + 16 files changed, 643 insertions(+), 165 deletions(-) create mode 100644 integration-tests/docker/docker-compose.query-error-test.yml rename integration-tests/docker/environment-configs/{historical-for-query-retry-test => historical-for-query-error-test} (88%) create mode 100644 integration-tests/docker/test-data/query-error-sample-data.sql rename integration-tests/src/main/java/org/apache/druid/cli/{CliHistoricalForQueryRetryTest.java => CliHistoricalForQueryErrorTest.java} (80%) create mode 100644 integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java delete mode 100644 integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java create mode 100644 integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java create mode 100644 integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json create mode 100644 integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json create mode 100644 integration-tests/src/test/resources/queries/sql_plan_failure_query.json diff --git a/integration-tests/docker/docker-compose.query-error-test.yml b/integration-tests/docker/docker-compose.query-error-test.yml new file mode 100644 index 000000000000..6d1d0521fdd4 --- /dev/null +++ b/integration-tests/docker/docker-compose.query-error-test.yml @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +version: "2.2" +services: + druid-zookeeper-kafka: + extends: + file: docker-compose.base.yml + service: druid-zookeeper-kafka + + druid-metadata-storage: + extends: + file: docker-compose.base.yml + service: druid-metadata-storage + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-zookeeper-kafka + + druid-overlord: + extends: + file: docker-compose.base.yml + service: druid-overlord + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-metadata-storage + - druid-zookeeper-kafka + + druid-coordinator: + extends: + file: docker-compose.base.yml + service: druid-coordinator + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-overlord + - druid-metadata-storage + - druid-zookeeper-kafka + + druid-broker: + extends: + file: docker-compose.base.yml + service: druid-broker + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-zookeeper-kafka + - druid-historical-for-query-error-test + + druid-router: + extends: + file: docker-compose.base.yml + service: druid-router + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-zookeeper-kafka + - druid-coordinator + - druid-broker + + druid-historical-for-query-error-test: + image: druid/cluster + container_name: druid-historical-for-query-error-test + networks: + druid-it-net: + ipv4_address: 172.172.172.14 + ports: + - 8084:8083 + - 8284:8283 + - 5010:5007 + privileged: true + volumes: + - ${HOME}/shared:/shared + - ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf + env_file: + - ./environment-configs/common + - ./environment-configs/historical-for-query-error-test + environment: + - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} + depends_on: + - druid-zookeeper-kafka + +networks: + druid-it-net: + name: druid-it-net + ipam: + config: + - subnet: 172.172.172.0/24 \ No newline at end of file diff --git a/integration-tests/docker/docker-compose.query-retry-test.yml b/integration-tests/docker/docker-compose.query-retry-test.yml index 139989a6b613..bcea3383acdc 100644 --- a/integration-tests/docker/docker-compose.query-retry-test.yml +++ b/integration-tests/docker/docker-compose.query-retry-test.yml @@ -96,7 +96,7 @@ services: - ./service-supervisords/druid.conf:/usr/lib/druid/conf/druid.conf env_file: - ./environment-configs/common - - ./environment-configs/historical-for-query-retry-test + - ./environment-configs/historical-for-query-error-test environment: - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} depends_on: diff --git a/integration-tests/docker/druid.sh b/integration-tests/docker/druid.sh index 358f88213dde..0a4f00b96525 100755 --- a/integration-tests/docker/druid.sh +++ b/integration-tests/docker/druid.sh @@ -23,7 +23,7 @@ getConfPath() case "$1" in _common) echo $cluster_conf_base/_common ;; historical) echo $cluster_conf_base/data/historical ;; - historical-for-query-retry-test) echo $cluster_conf_base/data/historical ;; + historical-for-query-error-test) echo $cluster_conf_base/data/historical ;; middleManager) echo $cluster_conf_base/data/middleManager ;; indexer) echo $cluster_conf_base/data/indexer ;; coordinator) echo $cluster_conf_base/master/coordinator ;; @@ -85,14 +85,14 @@ setupData() # The "query" and "security" test groups require data to be setup before running the tests. # In particular, they requires segments to be download from a pre-existing s3 bucket. # This is done by using the loadSpec put into metadatastore and s3 credientials set below. - if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ]; then + if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ]; then # touch is needed because OverlayFS's copy-up operation breaks POSIX standards. See https://github.com/docker/for-linux/issues/72. find /var/lib/mysql -type f -exec touch {} \; && service mysql start \ && cat /test-data/${DRUID_INTEGRATION_TEST_GROUP}-sample-data.sql | mysql -u root druid && /etc/init.d/mysql stop # below s3 credentials needed to access the pre-existing s3 bucket setKey $DRUID_SERVICE druid.s3.accessKey AKIAT2GGLKKJQCMG64V4 setKey $DRUID_SERVICE druid.s3.secretKey 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 - if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]; then + if [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ]; then setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\",\"druid-integration-tests\"] else setKey $DRUID_SERVICE druid.extensions.loadList [\"druid-s3-extensions\"] diff --git a/integration-tests/docker/environment-configs/historical-for-query-retry-test b/integration-tests/docker/environment-configs/historical-for-query-error-test similarity index 88% rename from integration-tests/docker/environment-configs/historical-for-query-retry-test rename to integration-tests/docker/environment-configs/historical-for-query-error-test index ec3db524e391..1aebbf24a03f 100644 --- a/integration-tests/docker/environment-configs/historical-for-query-retry-test +++ b/integration-tests/docker/environment-configs/historical-for-query-error-test @@ -17,8 +17,8 @@ # under the License. # -DRUID_SERVICE=historical-for-query-retry-test -DRUID_LOG_PATH=/shared/logs/historical-for-query-retry-test.log +DRUID_SERVICE=historical-for-query-error-test +DRUID_LOG_PATH=/shared/logs/historical-for-query-error-test.log # JAVA OPTS SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5010 @@ -27,6 +27,6 @@ SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=tr druid_processing_buffer_sizeBytes=25000000 druid_processing_numThreads=2 druid_query_groupBy_maxOnDiskStorage=300000000 -druid_segmentCache_locations=[{"path":"/shared/druid/indexCache-query-retry-test","maxSize":5000000000}] -druid_auth_basic_common_cacheDirectory=/tmp/authCache/historical-query-retry-test +druid_segmentCache_locations=[{"path":"/shared/druid/indexCache-query-error-test","maxSize":5000000000}] +druid_auth_basic_common_cacheDirectory=/tmp/authCache/historical-query-error-test druid_server_https_crlPath=/tls/revocations.crl diff --git a/integration-tests/docker/test-data/query-error-sample-data.sql b/integration-tests/docker/test-data/query-error-sample-data.sql new file mode 100644 index 000000000000..18ab48ad556b --- /dev/null +++ b/integration-tests/docker/test-data/query-error-sample-data.sql @@ -0,0 +1,20 @@ +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9','twitterstream','2013-05-13T01:08:18.192Z','2013-01-01T00:00:00.000Z','2013-01-02T00:00:00.000Z',0,'2013-01-02T04:13:41.980Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-01T00:00:00.000Z/2013-01-02T00:00:00.000Z\",\"version\":\"2013-01-02T04:13:41.980Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/2013-01-02T04:13:41.980Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":445235220,\"identifier\":\"twitterstream_2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z_2013-01-02T04:13:41.980Z_v9\"}'); +INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9','twitterstream','2013-05-13T00:03:28.640Z','2013-01-02T00:00:00.000Z','2013-01-03T00:00:00.000Z',0,'2013-01-03T03:44:58.791Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-02T00:00:00.000Z/2013-01-03T00:00:00.000Z\",\"version\":\"2013-01-03T03:44:58.791Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z/2013-01-03T03:44:58.791Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":435325540,\"identifier\":\"twitterstream_2013-01-02T00:00:00.000Z_2013-01-03T00:00:00.000Z_2013-01-03T03:44:58.791Z_v9\"}'); +INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9','twitterstream','2013-05-13T00:03:48.807Z','2013-01-03T00:00:00.000Z','2013-01-04T00:00:00.000Z',0,'2013-01-04T04:09:13.590Z_v9',1,'{\"dataSource\":\"twitterstream\",\"interval\":\"2013-01-03T00:00:00.000Z/2013-01-04T00:00:00.000Z\",\"version\":\"2013-01-04T04:09:13.590Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/twitterstream/2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z/2013-01-04T04:09:13.590Z_v9/0/index.zip\"},\"dimensions\":\"has_links,first_hashtag,user_time_zone,user_location,has_mention,user_lang,rt_name,user_name,is_retweet,is_viral,has_geo,url_domain,user_mention_name,reply_to_name\",\"metrics\":\"count,tweet_length,num_followers,num_links,num_mentions,num_hashtags,num_favorites,user_total_tweets\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":411651320,\"identifier\":\"twitterstream_2013-01-03T00:00:00.000Z_2013-01-04T00:00:00.000Z_2013-01-04T04:09:13.590Z_v9\"}'); +INSERT INTO druid_segments (id,dataSource,created_date,start,end,partitioned,version,used,payload) VALUES ('wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9','wikipedia_editstream','2013-03-15T20:49:52.348Z','2012-12-29T00:00:00.000Z','2013-01-10T08:00:00.000Z',0,'2013-01-10T08:13:47.830Z_v9',1,'{\"dataSource\":\"wikipedia_editstream\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/wikipedia_editstream/2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z/2013-01-10T08:13:47.830Z_v9/0/index.zip\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist,unique_users,variation\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":446027801,\"identifier\":\"wikipedia_editstream_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9\"}'); +INSERT INTO druid_segments (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES ('wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z', 'wikipedia', '2013-08-08T21:26:23.799Z', '2013-08-01T00:00:00.000Z', '2013-08-02T00:00:00.000Z', '0', '2013-08-08T21:22:48.989Z', '1', '{\"dataSource\":\"wikipedia\",\"interval\":\"2013-08-01T00:00:00.000Z/2013-08-02T00:00:00.000Z\",\"version\":\"2013-08-08T21:22:48.989Z\",\"loadSpec\":{\"type\":\"s3_zip\",\"bucket\":\"static.druid.io\",\"key\":\"data/segments/wikipedia/20130801T000000.000Z_20130802T000000.000Z/2013-08-08T21_22_48.989Z/0/index.zip\"},\"dimensions\":\"dma_code,continent_code,geo,area_code,robot,country_name,network,city,namespace,anonymous,unpatrolled,page,postal_code,language,newpage,user,region_lookup\",\"metrics\":\"count,delta,variation,added,deleted\",\"shardSpec\":{\"type\":\"none\"},\"binaryVersion\":9,\"size\":24664730,\"identifier\":\"wikipedia_2013-08-01T00:00:00.000Z_2013-08-02T00:00:00.000Z_2013-08-08T21:22:48.989Z\"}'); diff --git a/integration-tests/script/docker_compose_args.sh b/integration-tests/script/docker_compose_args.sh index 7fcc30f9f7fe..db256963d774 100644 --- a/integration-tests/script/docker_compose_args.sh +++ b/integration-tests/script/docker_compose_args.sh @@ -31,7 +31,7 @@ getComposeArgs() if [ "$DRUID_INTEGRATION_TEST_INDEXER" = "indexer" ] then # Sanity check: cannot combine CliIndexer tests with security, query-retry tests - if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] + if [ "$DRUID_INTEGRATION_TEST_GROUP" = "security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "ldap-security" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-retry" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] || [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] then echo "Cannot run test group '$DRUID_INTEGRATION_TEST_GROUP' with CliIndexer" exit 1 @@ -56,6 +56,11 @@ getComposeArgs() # default + additional historical modified for query retry test # See CliHistoricalForQueryRetryTest. echo "-f ${DOCKERDIR}/docker-compose.query-retry-test.yml" + elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "query-error" ] + then + # default + additional historical modified for query error test + # See CliHistoricalForQueryRetryTest. + echo "-f ${DOCKERDIR}/docker-compose.query-error-test.yml" elif [ "$DRUID_INTEGRATION_TEST_GROUP" = "high-availability" ] then # the 'high availability' test cluster with multiple coordinators and overlords diff --git a/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryErrorTest.java similarity index 80% rename from integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java rename to integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryErrorTest.java index 6ef34b11de96..c8124ff131e0 100644 --- a/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryRetryTest.java +++ b/integration-tests/src/main/java/org/apache/druid/cli/CliHistoricalForQueryErrorTest.java @@ -25,19 +25,19 @@ import org.apache.druid.guice.LazySingleton; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.query.QuerySegmentWalker; -import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest; +import org.apache.druid.server.coordination.ServerManagerForQueryErrorTest; import java.util.Properties; @Command( - name = "historical-for-query-retry-test", - description = "Runs a Historical node modified for query retry test" + name = "historical-for-query-error-test", + description = "Runs a Historical node modified for query error test" ) -public class CliHistoricalForQueryRetryTest extends CliHistorical +public class CliHistoricalForQueryErrorTest extends CliHistorical { - private static final Logger log = new Logger(CliHistoricalForQueryRetryTest.class); + private static final Logger log = new Logger(CliHistoricalForQueryErrorTest.class); - public CliHistoricalForQueryRetryTest() + public CliHistoricalForQueryErrorTest() { super(); } @@ -46,12 +46,12 @@ public CliHistoricalForQueryRetryTest() @Override public void configure(Properties properties) { - log.info("Historical is configured for testing query retry on missing segments"); + log.info("Historical is configured for testing query error on missing segments"); } @Override public void bindQuerySegmentWalker(Binder binder) { - binder.bind(QuerySegmentWalker.class).to(ServerManagerForQueryRetryTest.class).in(LazySingleton.class); + binder.bind(QuerySegmentWalker.class).to(ServerManagerForQueryErrorTest.class).in(LazySingleton.class); } } diff --git a/integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java b/integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java index 9635c5a26d81..2b4fc019f4b3 100644 --- a/integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java +++ b/integration-tests/src/main/java/org/apache/druid/cli/QueryRetryTestCommandCreator.java @@ -26,6 +26,6 @@ public class QueryRetryTestCommandCreator implements CliCommandCreator @Override public void addCommands(CliBuilder builder) { - builder.withGroup("server").withCommands(CliHistoricalForQueryRetryTest.class); + builder.withGroup("server").withCommands(CliHistoricalForQueryErrorTest.class); } } diff --git a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java new file mode 100644 index 000000000000..eaf244f2b2cc --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryErrorTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordination; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.druid.client.cache.Cache; +import org.apache.druid.client.cache.CacheConfig; +import org.apache.druid.client.cache.CachePopulator; +import org.apache.druid.guice.annotations.Processing; +import org.apache.druid.guice.annotations.Smile; +import org.apache.druid.java.util.common.guava.Accumulator; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Yielder; +import org.apache.druid.java.util.common.guava.YieldingAccumulator; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryCapacityExceededException; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.QueryTimeoutException; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.QueryUnsupportedException; +import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; +import org.apache.druid.query.ResourceLimitExceededException; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.segment.ReferenceCountingSegment; +import org.apache.druid.segment.SegmentReference; +import org.apache.druid.segment.join.JoinableFactory; +import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.timeline.VersionedIntervalTimeline; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +/** + * This server manager is designed to test various query failures. + * + * - Missing segments. A segment can be missing during a query if a historical drops the segment + * after the broker issues the query to the historical. To mimic this situation, the historical + * with this server manager announces all segments assigned, but reports missing segments for the + * first 3 segments specified in the query. See ITQueryRetryTestOnMissingSegments. + * - Other query errors. This server manager returns a sequence that always throws an exception + * based on a given query context value. See ITQueryErrorTest. + * + * @see org.apache.druid.query.RetryQueryRunner for query retrying. + * @see org.apache.druid.client.JsonParserIterator for handling query errors from historicals. + */ +public class ServerManagerForQueryErrorTest extends ServerManager +{ + // Query context key that indicates this query is for query retry testing. + public static final String QUERY_RETRY_TEST_CONTEXT_KEY = "query-retry-test"; + public static final String QUERY_TIMEOUT_TEST_CONTEXT_KEY = "query-timeout-test"; + public static final String QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY = "query-capacity-exceeded-test"; + public static final String QUERY_UNSUPPORTED_TEST_CONTEXT_KEY = "query-unsupported-test"; + public static final String RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY = "resource-limit-exceeded-test"; + public static final String QUERY_FAILURE_TEST_CONTEXT_KEY = "query-failure-test"; + + private static final Logger LOG = new Logger(ServerManagerForQueryErrorTest.class); + private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 3; + + private final ConcurrentHashMap> queryToIgnoredSegments = new ConcurrentHashMap<>(); + + @Inject + public ServerManagerForQueryErrorTest( + QueryRunnerFactoryConglomerate conglomerate, + ServiceEmitter emitter, + @Processing ExecutorService exec, + CachePopulator cachePopulator, + @Smile ObjectMapper objectMapper, + Cache cache, + CacheConfig cacheConfig, + SegmentManager segmentManager, + JoinableFactory joinableFactory, + ServerConfig serverConfig + ) + { + super( + conglomerate, + emitter, + exec, + cachePopulator, + objectMapper, + cache, + cacheConfig, + segmentManager, + joinableFactory, + serverConfig + ); + } + + @Override + QueryRunner buildQueryRunnerForSegment( + Query query, + SegmentDescriptor descriptor, + QueryRunnerFactory> factory, + QueryToolChest> toolChest, + VersionedIntervalTimeline timeline, + Function segmentMapFn, + AtomicLong cpuTimeAccumulator, + Optional cacheKeyPrefix + ) + { + if (query.getContextBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) { + final MutableBoolean isIgnoreSegment = new MutableBoolean(false); + queryToIgnoredSegments.compute( + query.getMostSpecificId(), + (queryId, ignoredSegments) -> { + if (ignoredSegments == null) { + ignoredSegments = new HashSet<>(); + } + if (ignoredSegments.size() < MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) { + ignoredSegments.add(descriptor); + isIgnoreSegment.setTrue(); + } + return ignoredSegments; + } + ); + + if (isIgnoreSegment.isTrue()) { + LOG.info("Pretending I don't have segment[%s]", descriptor); + return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); + } + } else if (query.getContextBoolean(QUERY_TIMEOUT_TEST_CONTEXT_KEY, false)) { + return (queryPlus, responseContext) -> new Sequence() + { + @Override + public OutType accumulate(OutType initValue, Accumulator accumulator) + { + throw new QueryTimeoutException("query timeout test"); + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + throw new QueryTimeoutException("query timeout test"); + } + }; + } else if (query.getContextBoolean(QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY, false)) { + return (queryPlus, responseContext) -> new Sequence() + { + @Override + public OutType accumulate(OutType initValue, Accumulator accumulator) + { + throw QueryCapacityExceededException.withErrorMessageAndResolvedHost("query capacity exceeded test"); + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + throw QueryCapacityExceededException.withErrorMessageAndResolvedHost("query capacity exceeded test"); + } + }; + } else if (query.getContextBoolean(QUERY_UNSUPPORTED_TEST_CONTEXT_KEY, false)) { + return (queryPlus, responseContext) -> new Sequence() + { + @Override + public OutType accumulate(OutType initValue, Accumulator accumulator) + { + throw new QueryUnsupportedException("query unsupported test"); + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + throw new QueryUnsupportedException("query unsupported test"); + } + }; + } else if (query.getContextBoolean(RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY, false)) { + return (queryPlus, responseContext) -> new Sequence() + { + @Override + public OutType accumulate(OutType initValue, Accumulator accumulator) + { + throw new ResourceLimitExceededException("resource limit exceeded test"); + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + throw new ResourceLimitExceededException("resource limit exceeded test"); + } + }; + } else if (query.getContextBoolean(QUERY_FAILURE_TEST_CONTEXT_KEY, false)) { + return (queryPlus, responseContext) -> new Sequence() + { + @Override + public OutType accumulate(OutType initValue, Accumulator accumulator) + { + throw new RuntimeException("query failure test"); + } + + @Override + public Yielder toYielder(OutType initValue, YieldingAccumulator accumulator) + { + throw new RuntimeException("query failure test"); + } + }; + } + + return super.buildQueryRunnerForSegment( + query, + descriptor, + factory, + toolChest, + timeline, + segmentMapFn, + cpuTimeAccumulator, + cacheKeyPrefix + ); + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java b/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java deleted file mode 100644 index 16a456244d0f..000000000000 --- a/integration-tests/src/main/java/org/apache/druid/server/coordination/ServerManagerForQueryRetryTest.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.server.coordination; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; -import org.apache.commons.lang3.mutable.MutableBoolean; -import org.apache.druid.client.cache.Cache; -import org.apache.druid.client.cache.CacheConfig; -import org.apache.druid.client.cache.CachePopulator; -import org.apache.druid.guice.annotations.Processing; -import org.apache.druid.guice.annotations.Smile; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.query.Query; -import org.apache.druid.query.QueryRunner; -import org.apache.druid.query.QueryRunnerFactory; -import org.apache.druid.query.QueryRunnerFactoryConglomerate; -import org.apache.druid.query.QueryToolChest; -import org.apache.druid.query.ReportTimelineMissingSegmentQueryRunner; -import org.apache.druid.query.SegmentDescriptor; -import org.apache.druid.segment.ReferenceCountingSegment; -import org.apache.druid.segment.SegmentReference; -import org.apache.druid.segment.join.JoinableFactory; -import org.apache.druid.server.SegmentManager; -import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.timeline.VersionedIntervalTimeline; - -import java.util.HashSet; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Function; - -/** - * This server manager is designed to test query retry on missing segments. A segment can be missing during a query - * if a historical drops the segment after the broker issues the query to the historical. To mimic this situation, - * the historical with this server manager announces all segments assigned, but reports missing segments for the - * first 3 segments specified in the query. - * - * @see org.apache.druid.query.RetryQueryRunner - */ -public class ServerManagerForQueryRetryTest extends ServerManager -{ - // Query context key that indicates this query is for query retry testing. - public static final String QUERY_RETRY_TEST_CONTEXT_KEY = "query-retry-test"; - - private static final Logger LOG = new Logger(ServerManagerForQueryRetryTest.class); - private static final int MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS = 3; - - private final ConcurrentHashMap> queryToIgnoredSegments = new ConcurrentHashMap<>(); - - @Inject - public ServerManagerForQueryRetryTest( - QueryRunnerFactoryConglomerate conglomerate, - ServiceEmitter emitter, - @Processing ExecutorService exec, - CachePopulator cachePopulator, - @Smile ObjectMapper objectMapper, - Cache cache, - CacheConfig cacheConfig, - SegmentManager segmentManager, - JoinableFactory joinableFactory, - ServerConfig serverConfig - ) - { - super( - conglomerate, - emitter, - exec, - cachePopulator, - objectMapper, - cache, - cacheConfig, - segmentManager, - joinableFactory, - serverConfig - ); - } - - @Override - QueryRunner buildQueryRunnerForSegment( - Query query, - SegmentDescriptor descriptor, - QueryRunnerFactory> factory, - QueryToolChest> toolChest, - VersionedIntervalTimeline timeline, - Function segmentMapFn, - AtomicLong cpuTimeAccumulator, - Optional cacheKeyPrefix - ) - { - if (query.getContextBoolean(QUERY_RETRY_TEST_CONTEXT_KEY, false)) { - final MutableBoolean isIgnoreSegment = new MutableBoolean(false); - queryToIgnoredSegments.compute( - query.getMostSpecificId(), - (queryId, ignoredSegments) -> { - if (ignoredSegments == null) { - ignoredSegments = new HashSet<>(); - } - if (ignoredSegments.size() < MAX_NUM_FALSE_MISSING_SEGMENTS_REPORTS) { - ignoredSegments.add(descriptor); - isIgnoreSegment.setTrue(); - } - return ignoredSegments; - } - ); - - if (isIgnoreSegment.isTrue()) { - LOG.info("Pretending I don't have segment[%s]", descriptor); - return new ReportTimelineMissingSegmentQueryRunner<>(descriptor); - } - } - return super.buildQueryRunnerForSegment( - query, - descriptor, - factory, - toolChest, - timeline, - segmentMapFn, - cpuTimeAccumulator, - cacheKeyPrefix - ); - } -} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java index 7f8f231a7803..8b962d2d1b5b 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/TestNGGroup.java @@ -54,6 +54,8 @@ public class TestNGGroup public static final String QUERY_RETRY = "query-retry"; + public static final String QUERY_ERROR = "query-error"; + public static final String CLI_INDEXER = "cli-indexer"; public static final String REALTIME_INDEX = "realtime-index"; diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java new file mode 100644 index 000000000000..9abc941c2e01 --- /dev/null +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryErrorTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.tests.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.server.coordination.ServerManagerForQueryErrorTest; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.guice.DruidTestModuleFactory; +import org.apache.druid.testing.utils.ITRetryUtil; +import org.apache.druid.testing.utils.SqlTestQueryHelper; +import org.apache.druid.testing.utils.TestQueryHelper; +import org.apache.druid.tests.TestNGGroup; +import org.apache.druid.tests.indexer.AbstractIndexerTest; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * This class tests various query failures. + * + * - SQL planning failures. Both {@link org.apache.calcite.sql.parser.SqlParseException} + * and {@link org.apache.calcite.tools.ValidationException} are tested using SQLs that must fail. + * - Various query errors from historicals. These tests use {@link ServerManagerForQueryErrorTest} to make + * the query to always throw an exception. They verify the error code returned by + * {@link org.apache.druid.sql.http.SqlResource} and {@link org.apache.druid.server.QueryResource}. + */ +@Test(groups = TestNGGroup.QUERY_ERROR) +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITQueryErrorTest +{ + private static final String WIKIPEDIA_DATA_SOURCE = "wikipedia_editstream"; + /** + * A simple query used for error tests from historicals. What query is does not matter because the query is always + * expected to fail. + * + * @see ServerManagerForQueryErrorTest#buildQueryRunnerForSegment + */ + private static final String NATIVE_QUERY_RESOURCE = + "/queries/native_query_error_from_historicals_test.json"; + private static final String SQL_QUERY_RESOURCE = + "/queries/sql_error_from_historicals_test.json"; + /** + * A simple sql query template used for plan failure tests. + */ + private static final String SQL_PLAN_FAILURE_RESOURCE = "/queries/sql_plan_failure_query.json"; + + @Inject + private CoordinatorResourceTestClient coordinatorClient; + @Inject + private TestQueryHelper queryHelper; + @Inject + private SqlTestQueryHelper sqlHelper; + @Inject + private ObjectMapper jsonMapper; + + @BeforeMethod + public void before() + { + // ensure that wikipedia segments are loaded completely + ITRetryUtil.retryUntilTrue( + () -> coordinatorClient.areSegmentsLoaded(WIKIPEDIA_DATA_SOURCE), "wikipedia segment load" + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*400.*") + public void testSqlParseException() throws Exception + { + // test a sql without SELECT + sqlHelper.testQueriesFromString(buildSqlPlanFailureQuery("FROM t WHERE col = 'a'")); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*400.*") + public void testSqlValidationException() throws Exception + { + // test a sql that selects unknown column + sqlHelper.testQueriesFromString( + buildSqlPlanFailureQuery(StringUtils.format("SELECT unknown_col FROM %s LIMIT 1", WIKIPEDIA_DATA_SOURCE)) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*504.*") + public void testSqlTimeout() throws Exception + { + sqlHelper.testQueriesFromString( + buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*429.*") + public void testSqlCapacityExceeded() throws Exception + { + sqlHelper.testQueriesFromString( + buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*501.*") + public void testSqlUnsupported() throws Exception + { + sqlHelper.testQueriesFromString( + buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*400.*") + public void testSqlResourceLimitExceeded() throws Exception + { + sqlHelper.testQueriesFromString( + buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*500.*") + public void testSqlFailure() throws Exception + { + sqlHelper.testQueriesFromString( + buildHistoricalErrorSqlQuery(ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*504.*") + public void testQueryTimeout() throws Exception + { + queryHelper.testQueriesFromString( + buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_TIMEOUT_TEST_CONTEXT_KEY) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*429.*") + public void testQueryCapacityExceeded() throws Exception + { + queryHelper.testQueriesFromString( + buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_CAPACITY_EXCEEDED_TEST_CONTEXT_KEY) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*501.*") + public void testQueryUnsupported() throws Exception + { + queryHelper.testQueriesFromString( + buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_UNSUPPORTED_TEST_CONTEXT_KEY) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*400.*") + public void testResourceLimitExceeded() throws Exception + { + queryHelper.testQueriesFromString( + buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.RESOURCE_LIMIT_EXCEEDED_TEST_CONTEXT_KEY) + ); + } + + @Test(expectedExceptions = {RuntimeException.class}, expectedExceptionsMessageRegExp = "(?s).*500.*") + public void testQueryFailure() throws Exception + { + queryHelper.testQueriesFromString( + buildHistoricalErrorTestQuery(ServerManagerForQueryErrorTest.QUERY_FAILURE_TEST_CONTEXT_KEY) + ); + } + + private String buildSqlPlanFailureQuery(String sql) throws IOException + { + return StringUtils.replace( + AbstractIndexerTest.getResourceAsString(SQL_PLAN_FAILURE_RESOURCE), + "%%QUERY%%", + sql + ); + } + + private String buildHistoricalErrorSqlQuery(String contextKey) throws IOException + { + return StringUtils.replace( + AbstractIndexerTest.getResourceAsString(SQL_QUERY_RESOURCE), + "%%CONTEXT%%", + jsonMapper.writeValueAsString(buildTestContext(contextKey)) + ); + } + + private String buildHistoricalErrorTestQuery(String contextKey) throws IOException + { + return StringUtils.replace( + AbstractIndexerTest.getResourceAsString(NATIVE_QUERY_RESOURCE), + "%%CONTEXT%%", + jsonMapper.writeValueAsString(buildTestContext(contextKey)) + ); + } + + private static Map buildTestContext(String key) + { + final Map context = new HashMap<>(); + // Disable cache so that each run hits historical. + context.put(QueryContexts.USE_CACHE_KEY, false); + context.put(key, true); + return context; + } +} diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java index e4842a498135..73394c3f0eef 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITQueryRetryTestOnMissingSegments.java @@ -26,7 +26,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.response.StatusResponseHolder; import org.apache.druid.query.QueryContexts; -import org.apache.druid.server.coordination.ServerManagerForQueryRetryTest; +import org.apache.druid.server.coordination.ServerManagerForQueryErrorTest; import org.apache.druid.testing.IntegrationTestingConfig; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; import org.apache.druid.testing.clients.QueryResourceTestClient; @@ -53,7 +53,7 @@ * the historical drops the segment after the broker issues the query to the historical. To mimic this case, this * test spawns two historicals, a normal historical and a historical modified for testing. The later historical * announces all segments assigned, but doesn't serve all of them. Instead, it can report missing segments for some - * segments. See {@link ServerManagerForQueryRetryTest} for more details. + * segments. See {@link ServerManagerForQueryErrorTest} for more details. *

* To run this test properly, the test group must be specified as {@link TestNGGroup#QUERY_RETRY}. */ @@ -237,7 +237,7 @@ private static Map buildContext(int numRetriesOnMissingSegments, context.put(QueryContexts.USE_CACHE_KEY, false); context.put(QueryContexts.NUM_RETRIES_ON_MISSING_SEGMENTS_KEY, numRetriesOnMissingSegments); context.put(QueryContexts.RETURN_PARTIAL_RESULTS_KEY, allowPartialResults); - context.put(ServerManagerForQueryRetryTest.QUERY_RETRY_TEST_CONTEXT_KEY, true); + context.put(ServerManagerForQueryErrorTest.QUERY_RETRY_TEST_CONTEXT_KEY, true); return context; } } diff --git a/integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json b/integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json new file mode 100644 index 000000000000..92b02a83be80 --- /dev/null +++ b/integration-tests/src/test/resources/queries/native_query_error_from_historicals_test.json @@ -0,0 +1,19 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query": { + "queryType": "timeseries", + "dataSource": "wikipedia_editstream", + "intervals": ["2013-01-01T00:00:00.000/2013-01-08T00:00:00.000"], + "granularity": "all", + "aggregations": [ + { + "type": "count", + "name": "rows" + } + ], + "context": %%CONTEXT%% + }, + "expectedResults": [] + } +] diff --git a/integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json b/integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json new file mode 100644 index 000000000000..d90441f855df --- /dev/null +++ b/integration-tests/src/test/resources/queries/sql_error_from_historicals_test.json @@ -0,0 +1,9 @@ +[ + { + "query": { + "query": "SELECT count(*) from wikipedia_editstream", + "context": %%CONTEXT%% + }, + "expectedResults": [] + } +] \ No newline at end of file diff --git a/integration-tests/src/test/resources/queries/sql_plan_failure_query.json b/integration-tests/src/test/resources/queries/sql_plan_failure_query.json new file mode 100644 index 000000000000..dd0fbc6f1f4f --- /dev/null +++ b/integration-tests/src/test/resources/queries/sql_plan_failure_query.json @@ -0,0 +1,8 @@ +[ + { + "query": { + "query": "%%QUERY%%" + }, + "expectedResults": [] + } +] \ No newline at end of file From d84cccd93665fdf7864b7b376a91b69601213f74 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 9 Apr 2021 23:10:57 -0700 Subject: [PATCH 3/4] run integraion test on travis --- .travis.yml | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 29bf971df075..b0e5309d9d42 100644 --- a/.travis.yml +++ b/.travis.yml @@ -472,6 +472,15 @@ jobs: script: *run_integration_test after_failure: *integration_test_diags + - &integration_query_error + name: "(Compile=openjdk8, Run=openjdk8) query error integration test" + stage: Tests - phase 2 + jdk: openjdk8 + services: *integration_test_services + env: TESTNG_GROUPS='-Dgroups=query-error' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' + script: *run_integration_test + after_failure: *integration_test_diags + - &integration_security name: "(Compile=openjdk8, Run=openjdk8) security integration test" stage: Tests - phase 2 @@ -530,13 +539,13 @@ jobs: stage: Tests - phase 2 jdk: openjdk8 services: *integration_test_services - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: *run_integration_test after_failure: *integration_test_diags - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk8) other integration tests with Indexer" - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk8) leadership and high availability integration tests" @@ -586,6 +595,11 @@ jobs: jdk: openjdk8 env: TESTNG_GROUPS='-Dgroups=query-retry' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' + - <<: *integration_query_error + name: "(Compile=openjdk8, Run=openjdk11) query error integration test for missing segments" + jdk: openjdk8 + env: TESTNG_GROUPS='-Dgroups=query-error' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' + - <<: *integration_security name: "(Compile=openjdk8, Run=openjdk11) security integration test" jdk: openjdk8 @@ -614,7 +628,7 @@ jobs: - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) other integration test" jdk: openjdk8 - env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' + env: TESTNG_GROUPS='-DexcludedGroups=batch-index,input-format,input-source,perfect-rollup-parallel-batch-index,kafka-index,query,query-retry,query-error,realtime-index,security,ldap-security,s3-deep-storage,gcs-deep-storage,azure-deep-storage,hdfs-deep-storage,s3-ingestion,kinesis-index,kinesis-data-format,kafka-transactional-index,kafka-index-slow,kafka-transactional-index-slow,kafka-data-format,hadoop-s3-to-s3-deep-storage,hadoop-s3-to-hdfs-deep-storage,hadoop-azure-to-azure-deep-storage,hadoop-azure-to-hdfs-deep-storage,hadoop-gcs-to-gcs-deep-storage,hadoop-gcs-to-hdfs-deep-storage,aliyun-oss-deep-storage,append-ingestion,compaction,high-availability' JVM_RUNTIME='-Djvm.runtime=11' USE_INDEXER='middleManager' - <<: *integration_tests name: "(Compile=openjdk8, Run=openjdk11) leadership and high availability integration tests" From 7c647a908df1e700998e1bc8977ab8e76dba1687 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 10 Apr 2021 11:42:43 -0700 Subject: [PATCH 4/4] fix inspection --- .../java/org/apache/druid/client/JsonParserIteratorTest.java | 2 +- services/src/main/java/org/apache/druid/cli/CliHistorical.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java b/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java index f85353c4de89..129a263cea12 100644 --- a/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java +++ b/server/src/test/java/org/apache/druid/client/JsonParserIteratorTest.java @@ -236,7 +236,7 @@ public static class TimeoutExceptionConversionTest public ExpectedException expectedException = ExpectedException.none(); @Test - public void testTimeoutBeforeCallingFuture() throws JsonProcessingException + public void testTimeoutBeforeCallingFuture() { JsonParserIterator iterator = new JsonParserIterator<>( JAVA_TYPE, diff --git a/services/src/main/java/org/apache/druid/cli/CliHistorical.java b/services/src/main/java/org/apache/druid/cli/CliHistorical.java index 815864b5be14..5c949e885384 100644 --- a/services/src/main/java/org/apache/druid/cli/CliHistorical.java +++ b/services/src/main/java/org/apache/druid/cli/CliHistorical.java @@ -137,7 +137,7 @@ protected List getModules() } /** - * This method is visible for testing query retry on missing segments. See {@link CliHistoricalForQueryRetryTest}. + * This method is visible for testing query retry on missing segments. See {@link CliHistoricalForQueryErrorTest}. */ @VisibleForTesting public void bindQuerySegmentWalker(Binder binder)