From 9a0639a51d4030455a03c505595daba61ff01b98 Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Wed, 16 Nov 2022 17:57:44 +0530 Subject: [PATCH 01/10] Migrate current integration batch tests to equivalent MSQ tests using new IT framework --- .travis.yml | 12 +- distribution/pom.xml | 2 + integration-tests-ex/cases/pom.xml | 2 +- .../druid/testsEx/msq/ITMSQForBatchIndex.java | 144 +++++++++++++++++ .../wikipedia_index_msq_queries.json | 150 ++++++++++++++++++ .../batch-index/wikipedia_index_msq.json | 4 + .../utils/MsqSqlTasksWithTestQueries.java | 33 ++++ .../testing/utils/MsqTestQueryHelper.java | 2 +- 8 files changed, 342 insertions(+), 7 deletions(-) create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/test-queries/wikipedia_index_msq_queries.json create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.json create mode 100644 integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java diff --git a/.travis.yml b/.travis.yml index 2fd32ffd7b9c..4ca2c93755e5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -689,11 +689,13 @@ jobs: env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='middleManager' script: ./it.sh travis Catalog - # Disabling BatchIndex test as it is failing with due to timeout, fixing it will be taken in a separate PR. - #- <<: *integration_tests_ex - # name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer (new)" - # env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' - # script: ./it.sh travis BatchIndex + - &integration_tests_ex + name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer (new)" + stage: Tests - phase 2 + jdk: openjdk8 + services: *integration_test_services + env: JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' + script: ./it.sh travis BatchIndex # END - Integration tests for Compile with Java 8 and Run with Java 8 diff --git a/distribution/pom.xml b/distribution/pom.xml index b73e4215a64a..e2b7773b09d4 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -225,6 +225,8 @@ -c org.apache.druid.extensions:druid-multi-stage-query -c + org.apache.druid.extensions:druid-catalog + -c org.apache.druid.extensions:druid-protobuf-extensions -c org.apache.druid.extensions:mysql-metadata-storage diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml index 5456d4b81b01..6cdd7a267302 100644 --- a/integration-tests-ex/cases/pom.xml +++ b/integration-tests-ex/cases/pom.xml @@ -346,7 +346,7 @@ - true + False org.apache.druid.testsEx.categories.${it.category} diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java new file mode 100644 index 000000000000..7df9a053aaca --- /dev/null +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.sql.SqlTaskStatus; +import org.apache.druid.testing.IntegrationTestingConfig; +import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.apache.druid.testing.utils.DataLoaderHelper; +import org.apache.druid.testing.utils.MsqSqlTasksWithTestQueries; +import org.apache.druid.testing.utils.MsqTestQueryHelper; +import org.apache.druid.testing.utils.TestQueryHelper; +import org.apache.druid.testsEx.categories.MultiStageQuery; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.apache.druid.testsEx.indexer.AbstractITBatchIndexTest; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +@RunWith(DruidTestRunner.class) + @Category(MultiStageQuery.class) + public class ITMSQForBatchIndex + { + public static final Logger LOG = new Logger(TestQueryHelper.class); + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + protected TestQueryHelper queryHelper; + + @Inject + private IntegrationTestingConfig config; + + @Inject + private ObjectMapper jsonMapper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Inject + private CoordinatorResourceTestClient coordinatorClient; + + private static final String BATCH_INDEX_TASKS_DIR = "/multi-stage-query/batch-index/"; + + protected void doTestQuery(String dataSource, String queryFilePath) + { + try { + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + dataSource + ); + queryHelper.testQueriesFromString(queryResponseTemplate); + + } + catch (Exception e) { + LOG.error(e, "Error while testing"); + throw new RuntimeException(e); + } + } + + @Test + public void testMsqIngestionForBatchIndexTasks() throws Exception + { + File[] files = (new File(getClass().getResource(BATCH_INDEX_TASKS_DIR).toURI())).listFiles(); + LOG.info(Arrays.toString(files)); + String datasource = "dst"; + for (int i=1; i() + { + } + ); + // Clear up the datasource from the previous runs + coordinatorClient.unloadSegmentsForDataSource(datasource + files[i].getName()); + + String sqlTask = StringUtils.replace( + batchSqlTasksAndQueries.getSqlTask(), + "%%DATASOURCE%%", + datasource + files[i].getName() + ); + + LOG.info("SqlTask - \n %s", sqlTask); + + // Submit the tasks and wait for the datasource to get loaded + SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(sqlTask); + + LOG.info("Sql Task submitted with task Id - %s", sqlTaskStatus.getTaskId()); + + if (sqlTaskStatus.getState().isFailure()) { + Assert.fail(StringUtils.format( + "Unable to start the task successfully.\nPossible exception: %s", + sqlTaskStatus.getError() + )); + } + msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId()); + dataLoaderHelper.waitUntilDatasourceIsReady(datasource + files[i].getName()); + String testQueriesFilePath = batchSqlTasksAndQueries.getTestQueries(); + doTestQuery(datasource + files[i].getName(), testQueriesFilePath); + } + } + } diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/test-queries/wikipedia_index_msq_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/test-queries/wikipedia_index_msq_queries.json new file mode 100644 index 000000000000..928effe65e97 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/test-queries/wikipedia_index_msq_queries.json @@ -0,0 +1,150 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T01:02:33.000Z", + "result" : { + "minTime" : "2013-08-31T01:02:33.000Z", + "maxTime" : "2013-09-01T12:41:27.000Z" + } + } + ] + }, + { + "description": "timeseries, datasketch aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type":"thetaSketch", + "name":"approxCountTheta", + "fieldName":"thetaSketch", + "size":16384, + "shouldFinalize":true, + "isInputThetaSketch":false, + "errorBoundsStdDev":null + }, + { + "type":"quantilesDoublesSketch", + "name":"quantilesSketch", + "fieldName":"quantilesDoublesSketch", + "k":128 + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "quantilesSketch":5, + "approxCountTheta":5.0, + "approxCountHLL":5 + } + } + ] + }, + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "page" + ], + "filter":{ + "type":"selector", + "dimension":"language", + "value":"zh" + }, + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"added", + "name":"added_count" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "2013-08-31T00:00:00.000Z", + "event" : { + "added_count_times_ten" : 9050.0, + "page" : "Crimson Typhoon", + "added_count" : 905, + "rows" : 1 + } + } ] + }, + { + "description": "timeseries, stringFirst/stringLast aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "stringFirst", + "name": "first_user", + "fieldName": "user" + }, + { + "type":"stringLast", + "name":"last_user", + "fieldName":"user" + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "first_user":"nuclear", + "last_user":"stringer" + } + } + ] + } +] \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.json new file mode 100644 index 000000000000..6bc8c25900e5 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.json @@ -0,0 +1,4 @@ +{ + "sqlTask" : "REPLACE INTO \"%%DATASOURCE%%\" OVERWRITE ALL\nWITH \"source\" as (SELECT * FROM TABLE(\n EXTERN(\n '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data2.json\",\"/resources/data/batch_index/json/wikipedia_index_data3.json\"]}',\n '{\"type\":\"json\"}',\n '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"string\"},{\"name\":\"language\",\"type\":\"string\"},{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"unpatrolled\",\"type\":\"string\"},{\"name\":\"newPage\",\"type\":\"string\"},{\"name\":\"robot\",\"type\":\"string\"},{\"name\":\"anonymous\",\"type\":\"string\"},{\"name\":\"namespace\",\"type\":\"string\"},{\"name\":\"continent\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"region\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"added\",\"type\":\"double\"},{\"name\":\"deleted\",\"type\":\"double\"},{\"name\":\"delta\",\"type\":\"double\"}]'\n )\n))\nSELECT\n TIME_FLOOR(CASE WHEN CAST(\"timestamp\" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST(\"timestamp\" AS BIGINT)) ELSE TIME_PARSE(\"timestamp\") END, 'PT1S') AS __time,\n \"page\",\n \"language\",\n \"user\",\n \"unpatrolled\",\n \"newPage\",\n \"robot\",\n \"anonymous\",\n \"namespace\",\n \"continent\",\n \"country\",\n \"region\",\n \"city\",\n COUNT(*) AS \"count\",\n SUM(\"added\") AS \"added\",\n SUM(\"deleted\") AS \"deleted\",\n SUM(\"delta\") AS \"delta\",\n APPROX_COUNT_DISTINCT_DS_THETA(\"user\") AS \"thetaSketch\",\n DS_QUANTILES_SKETCH(\"delta\") AS \"quantilesDoublesSketch\",\n APPROX_COUNT_DISTINCT_DS_HLL(\"user\") AS \"HLLSketchBuild\"\nFROM \"source\"\nGROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13\nPARTITIONED BY DAY", + "testQueriesFilePath" : "/multi-stage-query/batch-index/test-queries/wikipedia_index_msq_queries.json" +} \ No newline at end of file diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java new file mode 100644 index 000000000000..5a74834ad84c --- /dev/null +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java @@ -0,0 +1,33 @@ +package org.apache.druid.testing.utils; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class MsqSqlTasksWithTestQueries +{ + private final String sqlTask; + private final String testQueriesFilePath; + + @JsonCreator + public MsqSqlTasksWithTestQueries( + @JsonProperty("sqlTask") String sqlTask, + @JsonProperty("testQueriesFilePath") String testQueriesFilePath + ) + { + this.sqlTask = sqlTask; + this.testQueriesFilePath = testQueriesFilePath; + } + + @JsonProperty + public String getSqlTask() + { + return this.sqlTask; + } + + @JsonProperty + public String getTestQueries() + { + return this.testQueriesFilePath; + } + +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java index 424d070529fe..e5a2c44346b2 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java @@ -95,7 +95,7 @@ public String getQueryURL(String schemeAndHost) */ public SqlTaskStatus submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException { - return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of(), null)); + return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of("finalizeAggregations", false), null)); } // Run the task, wait for it to complete, fetch the reports, verify the results, From 1d85bc0f167e2786b388c276ee58cd296499148b Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Wed, 16 Nov 2022 20:30:35 +0530 Subject: [PATCH 02/10] Fix build issues --- .../druid/testsEx/msq/ITMSQForBatchIndex.java | 222 +++++++++--------- .../utils/MsqSqlTasksWithTestQueries.java | 19 ++ 2 files changed, 130 insertions(+), 111 deletions(-) diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java index 7df9a053aaca..a135d8322839 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java @@ -1,21 +1,21 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.druid.testsEx.msq; @@ -48,97 +48,97 @@ import java.util.Arrays; @RunWith(DruidTestRunner.class) - @Category(MultiStageQuery.class) - public class ITMSQForBatchIndex - { - public static final Logger LOG = new Logger(TestQueryHelper.class); - @Inject - private MsqTestQueryHelper msqHelper; - - @Inject - protected TestQueryHelper queryHelper; - - @Inject - private IntegrationTestingConfig config; - - @Inject - private ObjectMapper jsonMapper; - - @Inject - private DataLoaderHelper dataLoaderHelper; - - @Inject - private CoordinatorResourceTestClient coordinatorClient; - - private static final String BATCH_INDEX_TASKS_DIR = "/multi-stage-query/batch-index/"; - - protected void doTestQuery(String dataSource, String queryFilePath) - { - try { - String queryResponseTemplate; - try { - InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); - queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", queryFilePath); - } - - queryResponseTemplate = StringUtils.replace( - queryResponseTemplate, - "%%DATASOURCE%%", - dataSource - ); - queryHelper.testQueriesFromString(queryResponseTemplate); - - } - catch (Exception e) { - LOG.error(e, "Error while testing"); - throw new RuntimeException(e); - } - } - - @Test - public void testMsqIngestionForBatchIndexTasks() throws Exception - { - File[] files = (new File(getClass().getResource(BATCH_INDEX_TASKS_DIR).toURI())).listFiles(); - LOG.info(Arrays.toString(files)); - String datasource = "dst"; - for (int i=1; i() - { - } - ); - // Clear up the datasource from the previous runs - coordinatorClient.unloadSegmentsForDataSource(datasource + files[i].getName()); - - String sqlTask = StringUtils.replace( - batchSqlTasksAndQueries.getSqlTask(), - "%%DATASOURCE%%", - datasource + files[i].getName() - ); - - LOG.info("SqlTask - \n %s", sqlTask); - - // Submit the tasks and wait for the datasource to get loaded - SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(sqlTask); - - LOG.info("Sql Task submitted with task Id - %s", sqlTaskStatus.getTaskId()); - - if (sqlTaskStatus.getState().isFailure()) { - Assert.fail(StringUtils.format( - "Unable to start the task successfully.\nPossible exception: %s", - sqlTaskStatus.getError() - )); - } - msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId()); - dataLoaderHelper.waitUntilDatasourceIsReady(datasource + files[i].getName()); - String testQueriesFilePath = batchSqlTasksAndQueries.getTestQueries(); - doTestQuery(datasource + files[i].getName(), testQueriesFilePath); - } - } - } +@Category(MultiStageQuery.class) +public class ITMSQForBatchIndex +{ + public static final Logger LOG = new Logger(TestQueryHelper.class); + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + protected TestQueryHelper queryHelper; + + @Inject + private IntegrationTestingConfig config; + + @Inject + private ObjectMapper jsonMapper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + @Inject + private CoordinatorResourceTestClient coordinatorClient; + + private static final String BATCH_INDEX_TASKS_DIR = "/multi-stage-query/batch-index/"; + + protected void doTestQuery(String dataSource, String queryFilePath) + { + try { + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + dataSource + ); + queryHelper.testQueriesFromString(queryResponseTemplate); + + } + catch (Exception e) { + LOG.error(e, "Error while testing"); + throw new RuntimeException(e); + } + } + + @Test + public void testMsqIngestionForBatchIndexTasks() throws Exception + { + File[] files = (new File(getClass().getResource(BATCH_INDEX_TASKS_DIR).toURI())).listFiles(); + LOG.info(Arrays.toString(files)); + String datasource = "dst"; + for (int i = 1; i < files.length; i++) { + LOG.info("Starting MSQ test for [%s]", files[i]); + MsqSqlTasksWithTestQueries batchSqlTasksAndQueries = + jsonMapper.readValue( + TestQueryHelper.class.getResourceAsStream(BATCH_INDEX_TASKS_DIR + files[i].getName()), + new TypeReference() + { + } + ); + // Clear up the datasource from the previous runs + coordinatorClient.unloadSegmentsForDataSource(datasource + files[i].getName()); + + String sqlTask = StringUtils.replace( + batchSqlTasksAndQueries.getSqlTask(), + "%%DATASOURCE%%", + datasource + files[i].getName() + ); + + LOG.info("SqlTask - \n %s", sqlTask); + + // Submit the tasks and wait for the datasource to get loaded + SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(sqlTask); + + LOG.info("Sql Task submitted with task Id - %s", sqlTaskStatus.getTaskId()); + + if (sqlTaskStatus.getState().isFailure()) { + Assert.fail(StringUtils.format( + "Unable to start the task successfully.\nPossible exception: %s", + sqlTaskStatus.getError() + )); + } + msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId()); + dataLoaderHelper.waitUntilDatasourceIsReady(datasource + files[i].getName()); + String testQueriesFilePath = batchSqlTasksAndQueries.getTestQueries(); + doTestQuery(datasource + files[i].getName(), testQueriesFilePath); + } + } +} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java index 5a74834ad84c..537876b594cf 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.druid.testing.utils; import com.fasterxml.jackson.annotation.JsonCreator; From 6a7bece824d341ce203bfa80b8b6dbaaaf12a883 Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Wed, 16 Nov 2022 22:59:21 +0530 Subject: [PATCH 03/10] Trigger Build From 1b0f3ff8c8f5030c338844badd8720cb5dcf40da Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Fri, 18 Nov 2022 03:03:26 +0530 Subject: [PATCH 04/10] Adding more tests and addressing comments --- .../msq/AbstractITSQLBasedBatchIngestion.java | 125 +++++++++++++++ .../druid/testsEx/msq/ITMSQForBatchIndex.java | 144 ----------------- .../testsEx/msq/ITSQLBasedBatchIngestion.java | 64 ++++++++ .../msq_inline/json_path_index_queries.json | 49 ++++++ .../batch-index/msq_inline/msq_inline.sql | 17 ++ .../sparse_column_msq/sparse_column_msq.json | 93 +++++++++++ .../sparse_column_msq/sparse_column_msq.sql | 21 +++ .../wikipedia_http_inputsource_msq.sql | 29 ++++ .../wikipedia_http_inputsource_queries.json | 47 ++++++ .../batch-index/wikipedia_index_msq.json | 4 - .../wikipedia_local/wikipedia_index_msq.sql | 32 ++++ .../wikipedia_index_queries.json} | 0 .../wikipedia_index_queries.json | 150 ++++++++++++++++++ .../wikipedia_merge_index_task.sql | 33 ++++ ...ikipedia_index_queries_with_transform.json | 62 ++++++++ .../wikipedia_index_task_with_transform.sql | 32 ++++ .../utils/MsqSqlTasksWithTestQueries.java | 52 ------ .../testing/utils/MsqTestQueryHelper.java | 27 +++- 18 files changed, 780 insertions(+), 201 deletions(-) create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java delete mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java create mode 100644 integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/json_path_index_queries.json create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/msq_inline.sql create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.json create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.sql create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_msq.sql create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_queries.json delete mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.json create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_msq.sql rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{test-queries/wikipedia_index_msq_queries.json => wikipedia_local/wikipedia_index_queries.json} (100%) create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_index_queries.json create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_merge_index_task.sql create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_queries_with_transform.json create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_task_with_transform.sql delete mode 100644 integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java new file mode 100644 index 000000000000..564e6f015977 --- /dev/null +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import com.google.inject.Inject; +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.testing.utils.DataLoaderHelper; +import org.apache.druid.testing.utils.MsqTestQueryHelper; +import org.apache.druid.testing.utils.TestQueryHelper; +import org.apache.druid.testsEx.indexer.AbstractITBatchIndexTest; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class AbstractITSQLBasedBatchIngestion +{ + public static final Logger LOG = new Logger(TestQueryHelper.class); + @Inject + private MsqTestQueryHelper msqHelper; + + @Inject + protected TestQueryHelper queryHelper; + + @Inject + private DataLoaderHelper dataLoaderHelper; + + protected String getFileWithFormatFromDir(String dir, String format) throws URISyntaxException + { + File[] file = (new File(getClass().getResource(dir).toURI())).listFiles(pathname -> { + String name = pathname.getName().toLowerCase(); + return name.endsWith(format) && pathname.isFile(); + }); + return dir + '/' + file[0].getName(); + } + + protected String getSqlStringFromDir(String dir, String datasource) throws URISyntaxException + { + String filePath = getFileWithFormatFromDir(dir, ".sql"); + String sqlString; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(filePath); + sqlString = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", filePath); + } + + sqlString = StringUtils.replace( + sqlString, + "%%DATASOURCE%%", + datasource + ); + + return sqlString; + } + + protected void doTestQuery(String dir, String dataSource) throws URISyntaxException + { + String queryFilePath = getFileWithFormatFromDir(dir, ".json"); + try { + String queryResponseTemplate; + try { + InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); + queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); + } + catch (IOException e) { + throw new ISE(e, "could not read query file: %s", queryFilePath); + } + + queryResponseTemplate = StringUtils.replace( + queryResponseTemplate, + "%%DATASOURCE%%", + dataSource + ); + queryHelper.testQueriesFromString(queryResponseTemplate); + + } + catch (Exception e) { + LOG.error(e, "Error while running test query"); + throw new RuntimeException(e); + } + } + + protected void runMSQTaskandTestQueries(String relativePath, String datasource, Map msqContext) throws Exception + { + LOG.info("Starting MSQ test for [%s]", relativePath); + + String sqlTask = getSqlStringFromDir(relativePath, datasource); + + LOG.info("SqlTask - \n %s", sqlTask); + + // Submit the tasks and wait for the datasource to get loaded + msqHelper.submitMsqTaskAndWaitForCompletion( + sqlTask, + msqContext + ); + + dataLoaderHelper.waitUntilDatasourceIsReady(datasource); + doTestQuery(relativePath, datasource); + } +} diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java deleted file mode 100644 index a135d8322839..000000000000 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITMSQForBatchIndex.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.testsEx.msq; - -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.inject.Inject; -import org.apache.commons.io.IOUtils; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.msq.sql.SqlTaskStatus; -import org.apache.druid.testing.IntegrationTestingConfig; -import org.apache.druid.testing.clients.CoordinatorResourceTestClient; -import org.apache.druid.testing.utils.DataLoaderHelper; -import org.apache.druid.testing.utils.MsqSqlTasksWithTestQueries; -import org.apache.druid.testing.utils.MsqTestQueryHelper; -import org.apache.druid.testing.utils.TestQueryHelper; -import org.apache.druid.testsEx.categories.MultiStageQuery; -import org.apache.druid.testsEx.config.DruidTestRunner; -import org.apache.druid.testsEx.indexer.AbstractITBatchIndexTest; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; - -@RunWith(DruidTestRunner.class) -@Category(MultiStageQuery.class) -public class ITMSQForBatchIndex -{ - public static final Logger LOG = new Logger(TestQueryHelper.class); - @Inject - private MsqTestQueryHelper msqHelper; - - @Inject - protected TestQueryHelper queryHelper; - - @Inject - private IntegrationTestingConfig config; - - @Inject - private ObjectMapper jsonMapper; - - @Inject - private DataLoaderHelper dataLoaderHelper; - - @Inject - private CoordinatorResourceTestClient coordinatorClient; - - private static final String BATCH_INDEX_TASKS_DIR = "/multi-stage-query/batch-index/"; - - protected void doTestQuery(String dataSource, String queryFilePath) - { - try { - String queryResponseTemplate; - try { - InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); - queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", queryFilePath); - } - - queryResponseTemplate = StringUtils.replace( - queryResponseTemplate, - "%%DATASOURCE%%", - dataSource - ); - queryHelper.testQueriesFromString(queryResponseTemplate); - - } - catch (Exception e) { - LOG.error(e, "Error while testing"); - throw new RuntimeException(e); - } - } - - @Test - public void testMsqIngestionForBatchIndexTasks() throws Exception - { - File[] files = (new File(getClass().getResource(BATCH_INDEX_TASKS_DIR).toURI())).listFiles(); - LOG.info(Arrays.toString(files)); - String datasource = "dst"; - for (int i = 1; i < files.length; i++) { - LOG.info("Starting MSQ test for [%s]", files[i]); - MsqSqlTasksWithTestQueries batchSqlTasksAndQueries = - jsonMapper.readValue( - TestQueryHelper.class.getResourceAsStream(BATCH_INDEX_TASKS_DIR + files[i].getName()), - new TypeReference() - { - } - ); - // Clear up the datasource from the previous runs - coordinatorClient.unloadSegmentsForDataSource(datasource + files[i].getName()); - - String sqlTask = StringUtils.replace( - batchSqlTasksAndQueries.getSqlTask(), - "%%DATASOURCE%%", - datasource + files[i].getName() - ); - - LOG.info("SqlTask - \n %s", sqlTask); - - // Submit the tasks and wait for the datasource to get loaded - SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(sqlTask); - - LOG.info("Sql Task submitted with task Id - %s", sqlTaskStatus.getTaskId()); - - if (sqlTaskStatus.getState().isFailure()) { - Assert.fail(StringUtils.format( - "Unable to start the task successfully.\nPossible exception: %s", - sqlTaskStatus.getError() - )); - } - msqHelper.pollTaskIdForCompletion(sqlTaskStatus.getTaskId()); - dataLoaderHelper.waitUntilDatasourceIsReady(datasource + files[i].getName()); - String testQueriesFilePath = batchSqlTasksAndQueries.getTestQueries(); - doTestQuery(datasource + files[i].getName(), testQueriesFilePath); - } - } -} diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java new file mode 100644 index 000000000000..9f55624e0b4b --- /dev/null +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.testsEx.msq; + +import org.apache.curator.shaded.com.google.common.collect.ImmutableMap; +import org.apache.druid.testsEx.categories.MultiStageQuery; +import org.apache.druid.testsEx.config.DruidTestRunner; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.File; +import java.util.Arrays; + +@RunWith(DruidTestRunner.class) +@Category(MultiStageQuery.class) +public class ITSQLBasedBatchIngestion extends AbstractITSQLBasedBatchIngestion +{ + private static final String BATCH_INDEX_TASKS_DIRs = "/multi-stage-query/batch-index/"; + + @Test + public void testSQLBasedBatchIngestion() throws Exception + { + // Get list of all directories in batch-index folder. Each folder is considered a test case. + File[] Directories = (new File(getClass().getResource(BATCH_INDEX_TASKS_DIRs).toURI())).listFiles(File::isDirectory); + int fail_count = 0; + LOG.info("Test will be run for sql in the following directories - \n %s", Arrays.toString(Directories)); + Directories= new File[]{new File("/Users/abhishekagrawal/abhagraw/druid/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform")}; + for (File dir : Directories) { + try { + runMSQTaskandTestQueries(BATCH_INDEX_TASKS_DIRs + dir.getName(), "msqBatchIndex_" + dir.getName(), + ImmutableMap.of("finalizeAggregations", false, + "maxNumTasks", 10, + "groupByEnableMultiValueUnnesting", false + )); + } + catch (Exception e) { + LOG.error(e, "Error while testing %s", dir.getName()); + fail_count++; + } + } + if (fail_count > 0){ + LOG.error("%s tests were run out of which %s FAILED", Directories.length, fail_count); + throw new RuntimeException(); + } + } +} diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/json_path_index_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/json_path_index_queries.json new file mode 100644 index 000000000000..845af00dd883 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/json_path_index_queries.json @@ -0,0 +1,49 @@ +[ + { + "description": "timeseries", + "query": { + "queryType": "timeseries", + "dataSource": "%%DATASOURCE%%", + "intervals": [ + "1000/3000" + ], + "aggregations": [ + { + "type": "longSum", + "name": "len", + "fieldName": "len" + }, + { + "type": "longSum", + "name": "max", + "fieldName": "max" + }, + { + "type": "longSum", + "name": "min", + "fieldName": "min" + }, + { + "type": "longSum", + "name": "sum", + "fieldName": "sum" + } + ], + "granularity": { + "type": "all" + } + }, + "expectedResults": [ + { + "timestamp": "2013-08-31T01:02:33.000Z", + "result": { + "sum": 10, + "min": 0, + "len": 5, + "max": 4 + } + } + ] + } +] + diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/msq_inline.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/msq_inline.sql new file mode 100644 index 000000000000..a710691574ae --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/msq_inline.sql @@ -0,0 +1,17 @@ +REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL +WITH "source" AS (SELECT * FROM TABLE( + EXTERN( + '{"type":"inline","data":"{\"timestamp\": \"2013-08-31T01:02:33Z\", \"values\": [0,1,2,3,4] }"}', + '{"type":"json","flattenSpec":{"useFieldDiscovery":true,"fields":[{"type":"path","name":"len","expr":"$.values.length()"},{"type":"path","name":"min","expr":"$.values.min()"},{"type":"path","name":"max","expr":"$.values.max()"},{"type":"path","name":"sum","expr":"$.values.sum()"}]}}', + '[{"name":"timestamp","type":"string"},{"name":"len","type":"long"},{"name":"min","type":"long"},{"name":"max","type":"long"},{"name":"sum","type":"long"}]' + ) +)) +SELECT + TIME_PARSE("timestamp") AS __time, + "len", + "min", + "max", + "sum" +FROM "source" +GROUP BY 1, 2, 3, 4, 5 +PARTITIONED BY HOUR \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.json new file mode 100644 index 000000000000..4c2c5aa2950e --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.json @@ -0,0 +1,93 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2015-09-12T00:00:00.000Z", + "result" : { + "minTime" : "2015-09-12T00:00:00.000Z", + "maxTime" : "2015-09-12T00:00:00.000Z" + } + } + ] + }, + { + "description": "scan, all", + "query": { + "queryType": "scan", + "dataSource": "%%DATASOURCE%%", + "intervals": [ + "2013-01-01/2020-01-02" + ], + "resultFormat":"compactedList" + }, + "expectedResults": [{ + "segmentId":"dstsparse_column_msq.json_2015-09-12T00:00:00.000Z_2015-09-12T01:00:00.000Z_2022-11-17T12:32:11.247Z", + "columns":["__time","dimB","dimA","dimC","dimD","dimE","dimF","count","sum_metA"], + "events":[ + [1442016000000,"F","C",null,null,null,null,1,1], + [1442016000000,"J","C",null,null,null,null,1,1], + [1442016000000,"R","J",null,null,null,null,1,1], + [1442016000000,"S","Z",null,null,null,null,1,1], + [1442016000000,"T","H",null,null,null,null,1,1], + [1442016000000,"X",null,"A",null,null,null,1,1], + [1442016000000,"X","H",null,null,null,null,3,3], + [1442016000000,"Z","H",null,null,null,null,1,1] + ] + }], + "fieldsToTest": ["events"] + }, + { + "description": "roll up ratio", + "query": { + "queryType":"timeseries", + "dataSource":{ + "type":"table", + "name":"%%DATASOURCE%%" + }, + "intervals":{ + "type":"intervals", + "intervals":[ + "2013-01-01/2020-01-02" + ] + }, + "granularity":{ + "type":"all" + }, + "aggregations":[ + { + "type":"count", + "name":"a0" + }, + { + "type":"longSum", + "name":"a1", + "fieldName":"count", + "expression":null + } + ], + "postAggregations":[ + { + "type":"expression", + "name":"p0", + "expression":"((\"a0\" * 1.00) / \"a1\")", + "ordering":null + } + ] + }, + "expectedResults": [ + { + "timestamp" : "2015-09-12T00:00:00.000Z", + "result" : { + "a1" : 10, + "p0" : 0.8, + "a0" : 8 + } + } + ] + } +] \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.sql new file mode 100644 index 000000000000..f844f5996469 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.sql @@ -0,0 +1,21 @@ +REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL +WITH "source" AS (SELECT * FROM TABLE( + EXTERN( + '{"type":"inline","data":"{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"F\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"C\",\"dimB\":\"J\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"Z\",\"dimB\":\"S\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"Z\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"J\",\"dimB\":\"R\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"T\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimA\":\"H\",\"dimB\":\"X\",\"metA\":1}\n{\"time\":\"2015-09-12T00:46:58.771Z\",\"dimC\":\"A\",\"dimB\":\"X\",\"metA\":1}\n"}', + '{"type":"json"}', + '[{"name":"time","type":"string"},{"name":"dimB","type":"string"},{"name":"dimA","type":"string"},{"name":"dimC","type":"string"},{"name":"dimD","type":"string"},{"name":"dimE","type":"string"},{"name":"dimF","type":"string"},{"name":"metA","type":"long"}]' + ) +)) +SELECT + TIME_FLOOR(TIME_PARSE("time"), 'PT1H') AS __time, + "dimB", + "dimA", + "dimC", + "dimD", + "dimE", + "dimF", + COUNT(*) AS "count", + SUM("metA") AS "sum_metA" +FROM "source" +GROUP BY 1, 2, 3, 4, 5, 6, 7 +PARTITIONED BY HOUR \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_msq.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_msq.sql new file mode 100644 index 000000000000..f1af33bed43f --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_msq.sql @@ -0,0 +1,29 @@ +REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL +WITH "source" AS (SELECT * FROM TABLE( + EXTERN( + '{"type":"http","uris":["https://druid.apache.org/data/wikipedia.json.gz","https://druid.apache.org/data/wikipedia.json.gz"]}', + '{"type":"json"}', + '[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]' + ) +)) +SELECT + TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time, + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city", + COUNT(*) AS "count", + SUM("added") AS "added", + SUM("deleted") AS "deleted", + SUM("delta") AS "delta" +FROM "source" +GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 +PARTITIONED BY DAY \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_queries.json new file mode 100644 index 000000000000..2d454d59d80a --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_queries.json @@ -0,0 +1,47 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2016-06-27T00:00:11.000Z", + "result" : { + "minTime" : "2016-06-27T00:00:11.000Z", + "maxTime" : "2016-06-27T21:31:02.000Z" + } + } + ] + }, + { + "description": "simple aggr", + "query":{ + "queryType" : "topN", + "dataSource" : "%%DATASOURCE%%", + "intervals" : ["2016-06-27/2016-06-28"], + "granularity" : "all", + "dimension" : "page", + "metric" : "count", + "threshold" : 3, + "aggregations" : [ + { + "type" : "count", + "name" : "count" + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2016-06-27T00:00:11.000Z", + "result" : + [ + {"count":29,"page":"Copa América Centenario"}, + {"count":16,"page":"User:Cyde/List of candidates for speedy deletion/Subpage"}, + {"count":16,"page":"Wikipedia:Administrators' noticeboard/Incidents"} + ] + } + ] + } +] \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.json deleted file mode 100644 index 6bc8c25900e5..000000000000 --- a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "sqlTask" : "REPLACE INTO \"%%DATASOURCE%%\" OVERWRITE ALL\nWITH \"source\" as (SELECT * FROM TABLE(\n EXTERN(\n '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data2.json\",\"/resources/data/batch_index/json/wikipedia_index_data3.json\"]}',\n '{\"type\":\"json\"}',\n '[{\"name\":\"timestamp\",\"type\":\"string\"},{\"name\":\"page\",\"type\":\"string\"},{\"name\":\"language\",\"type\":\"string\"},{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"unpatrolled\",\"type\":\"string\"},{\"name\":\"newPage\",\"type\":\"string\"},{\"name\":\"robot\",\"type\":\"string\"},{\"name\":\"anonymous\",\"type\":\"string\"},{\"name\":\"namespace\",\"type\":\"string\"},{\"name\":\"continent\",\"type\":\"string\"},{\"name\":\"country\",\"type\":\"string\"},{\"name\":\"region\",\"type\":\"string\"},{\"name\":\"city\",\"type\":\"string\"},{\"name\":\"added\",\"type\":\"double\"},{\"name\":\"deleted\",\"type\":\"double\"},{\"name\":\"delta\",\"type\":\"double\"}]'\n )\n))\nSELECT\n TIME_FLOOR(CASE WHEN CAST(\"timestamp\" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST(\"timestamp\" AS BIGINT)) ELSE TIME_PARSE(\"timestamp\") END, 'PT1S') AS __time,\n \"page\",\n \"language\",\n \"user\",\n \"unpatrolled\",\n \"newPage\",\n \"robot\",\n \"anonymous\",\n \"namespace\",\n \"continent\",\n \"country\",\n \"region\",\n \"city\",\n COUNT(*) AS \"count\",\n SUM(\"added\") AS \"added\",\n SUM(\"deleted\") AS \"deleted\",\n SUM(\"delta\") AS \"delta\",\n APPROX_COUNT_DISTINCT_DS_THETA(\"user\") AS \"thetaSketch\",\n DS_QUANTILES_SKETCH(\"delta\") AS \"quantilesDoublesSketch\",\n APPROX_COUNT_DISTINCT_DS_HLL(\"user\") AS \"HLLSketchBuild\"\nFROM \"source\"\nGROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13\nPARTITIONED BY DAY", - "testQueriesFilePath" : "/multi-stage-query/batch-index/test-queries/wikipedia_index_msq_queries.json" -} \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_msq.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_msq.sql new file mode 100644 index 000000000000..738e39fb8709 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_msq.sql @@ -0,0 +1,32 @@ +REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL +WITH "source" as (SELECT * FROM TABLE( + EXTERN( + '{"type":"local","files":["/resources/data/batch_index/json/wikipedia_index_data1.json","/resources/data/batch_index/json/wikipedia_index_data2.json","/resources/data/batch_index/json/wikipedia_index_data3.json"]}', + '{"type":"json"}', + '[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]' + ) +)) +SELECT + TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time, + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city", + COUNT(*) AS "count", + SUM("added") AS "added", + SUM("deleted") AS "deleted", + SUM("delta") AS "delta", + APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch", + DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch", + APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild" +FROM "source" +GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 +PARTITIONED BY DAY \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/test-queries/wikipedia_index_msq_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_queries.json similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/test-queries/wikipedia_index_msq_queries.json rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_queries.json diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_index_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_index_queries.json new file mode 100644 index 000000000000..928effe65e97 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_index_queries.json @@ -0,0 +1,150 @@ +[ + { + "description": "timeseries, 1 agg, all", + "query":{ + "queryType" : "timeBoundary", + "dataSource": "%%DATASOURCE%%" + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T01:02:33.000Z", + "result" : { + "minTime" : "2013-08-31T01:02:33.000Z", + "maxTime" : "2013-09-01T12:41:27.000Z" + } + } + ] + }, + { + "description": "timeseries, datasketch aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "HLLSketchMerge", + "name": "approxCountHLL", + "fieldName": "HLLSketchBuild", + "lgK": 12, + "tgtHllType": "HLL_4", + "round": true + }, + { + "type":"thetaSketch", + "name":"approxCountTheta", + "fieldName":"thetaSketch", + "size":16384, + "shouldFinalize":true, + "isInputThetaSketch":false, + "errorBoundsStdDev":null + }, + { + "type":"quantilesDoublesSketch", + "name":"quantilesSketch", + "fieldName":"quantilesDoublesSketch", + "k":128 + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "quantilesSketch":5, + "approxCountTheta":5.0, + "approxCountHLL":5 + } + } + ] + }, + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "page" + ], + "filter":{ + "type":"selector", + "dimension":"language", + "value":"zh" + }, + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"added", + "name":"added_count" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "2013-08-31T00:00:00.000Z", + "event" : { + "added_count_times_ten" : 9050.0, + "page" : "Crimson Typhoon", + "added_count" : 905, + "rows" : 1 + } + } ] + }, + { + "description": "timeseries, stringFirst/stringLast aggs, all", + "query":{ + "queryType" : "timeseries", + "dataSource": "%%DATASOURCE%%", + "granularity":"day", + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ], + "filter":null, + "aggregations":[ + { + "type": "stringFirst", + "name": "first_user", + "fieldName": "user" + }, + { + "type":"stringLast", + "name":"last_user", + "fieldName":"user" + } + ] + }, + "expectedResults":[ + { + "timestamp" : "2013-08-31T00:00:00.000Z", + "result" : { + "first_user":"nuclear", + "last_user":"stringer" + } + } + ] + } +] \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_merge_index_task.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_merge_index_task.sql new file mode 100644 index 000000000000..a8160aa9055b --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_merge_index_task.sql @@ -0,0 +1,33 @@ +REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL +WITH "source" AS (SELECT * FROM TABLE( + EXTERN( + '{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}', + '{"type":"json"}', + '[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"newPage","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]' + ) +)) +SELECT + TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time, + "page", + "language", + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city", + "timestamp", + COUNT(*) AS "count", + SUM("added") AS "added", + SUM("deleted") AS "deleted", + SUM("delta") AS "delta", + APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch", + DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch", + APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild" +FROM "source" +GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14 +PARTITIONED BY DAY \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_queries_with_transform.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_queries_with_transform.json new file mode 100644 index 000000000000..f0cfba677354 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_queries_with_transform.json @@ -0,0 +1,62 @@ +[ + { + "description":"having spec on post aggregation", + "query":{ + "queryType":"groupBy", + "dataSource":"%%DATASOURCE%%", + "granularity":"day", + "dimensions":[ + "page", + "city" + ], + "filter":{ + "type":"selector", + "dimension":"language", + "value":"language-zh" + }, + "aggregations":[ + { + "type":"count", + "name":"rows" + }, + { + "type":"longSum", + "fieldName":"triple-added", + "name":"added_count" + }, + { + "type":"longSum", + "fieldName":"delta", + "name":"delta_sum" + } + ], + "postAggregations": [ + { + "type":"arithmetic", + "name":"added_count_times_ten", + "fn":"*", + "fields":[ + {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, + {"type":"constant", "name":"const", "value":10} + ] + } + ], + "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, + "intervals":[ + "2013-08-31T00:00/2013-09-01T00:00" + ] + }, + "expectedResults":[ { + "version" : "v1", + "timestamp" : "2013-08-31T00:00:00.000Z", + "event" : { + "added_count_times_ten" : 27150.0, + "page" : "Crimson Typhoon", + "city" : "Taiyuan", + "added_count" : 2715, + "delta_sum" : 900, + "rows" : 1 + } + } ] + } +] \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_task_with_transform.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_task_with_transform.sql new file mode 100644 index 000000000000..ebdeeda68937 --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_task_with_transform.sql @@ -0,0 +1,32 @@ +REPLACE INTO "%%DATASOURCE%%" OVERWRITE ALL +WITH "source" AS (SELECT * FROM TABLE( + EXTERN( + '{"type":"local","baseDir":"/resources/data/batch_index/json","filter":"wikipedia_index_data*"}', + '{"type":"json"}', + '[{"name":"timestamp","type":"string"},{"name":"page","type":"string"},{"name":"language","type":"string"},{"name":"user","type":"string"},{"name":"unpatrolled","type":"string"},{"name":"robot","type":"string"},{"name":"anonymous","type":"string"},{"name":"namespace","type":"string"},{"name":"continent","type":"string"},{"name":"country","type":"string"},{"name":"region","type":"string"},{"name":"city","type":"string"},{"name":"added","type":"double"},{"name":"triple-added","type":"double"},{"name":"deleted","type":"double"},{"name":"delta","type":"double"}]' + ) +)) +SELECT + TIME_FLOOR(CASE WHEN CAST("timestamp" AS BIGINT) > 0 THEN MILLIS_TO_TIMESTAMP(CAST("timestamp" AS BIGINT)) ELSE TIME_PARSE("timestamp") END, 'PT1S') AS __time, + "page", + concat('language-', "language") AS "language", + "user", + "unpatrolled", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city", + COUNT(*) AS "count", + SUM("added") AS "added", + SUM("added")*3 AS "triple-added", + SUM("deleted") AS "deleted", + SUM("delta") AS "delta", + APPROX_COUNT_DISTINCT_DS_THETA("user") AS "thetaSketch", + DS_QUANTILES_SKETCH("delta") AS "quantilesDoublesSketch", + APPROX_COUNT_DISTINCT_DS_HLL("user") AS "HLLSketchBuild" +FROM "source" +GROUP BY 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12 +PARTITIONED BY DAY \ No newline at end of file diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java deleted file mode 100644 index 537876b594cf..000000000000 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqSqlTasksWithTestQueries.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.testing.utils; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -public class MsqSqlTasksWithTestQueries -{ - private final String sqlTask; - private final String testQueriesFilePath; - - @JsonCreator - public MsqSqlTasksWithTestQueries( - @JsonProperty("sqlTask") String sqlTask, - @JsonProperty("testQueriesFilePath") String testQueriesFilePath - ) - { - this.sqlTask = sqlTask; - this.testQueriesFilePath = testQueriesFilePath; - } - - @JsonProperty - public String getSqlTask() - { - return this.sqlTask; - } - - @JsonProperty - public String getTestQueries() - { - return this.testQueriesFilePath; - } - -} diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java index e5a2c44346b2..c98f1376339c 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java @@ -43,6 +43,7 @@ import org.apache.druid.testing.clients.SqlResourceTestClient; import org.apache.druid.testing.clients.msq.MsqOverlordResourceTestClient; import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.testng.Assert; import java.util.ArrayList; import java.util.Collections; @@ -95,7 +96,15 @@ public String getQueryURL(String schemeAndHost) */ public SqlTaskStatus submitMsqTask(String sqlQueryString) throws ExecutionException, InterruptedException { - return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, ImmutableMap.of("finalizeAggregations", false), null)); + return submitMsqTask(sqlQueryString, ImmutableMap.of()); + } + + /** + * Submits a task to the MSQ API with the given query string, and default headers and custom context parameters + */ + public SqlTaskStatus submitMsqTask(String sqlQueryString, Map context) throws ExecutionException, InterruptedException + { + return submitMsqTask(new SqlQuery(sqlQueryString, null, false, false, false, context, null)); } // Run the task, wait for it to complete, fetch the reports, verify the results, @@ -250,6 +259,22 @@ public void testQueriesFromFile(String filePath, String fullDatasourcePath) thro } } + public void submitMsqTaskAndWaitForCompletion(String sqlQueryString, Map context) + throws Exception + { + SqlTaskStatus sqlTaskStatus = submitMsqTask(sqlQueryString, context); + + LOG.info("Sql Task submitted with task Id - %s", sqlTaskStatus.getTaskId()); + + if (sqlTaskStatus.getState().isFailure()) { + Assert.fail(StringUtils.format( + "Unable to start the task successfully.\nPossible exception: %s", + sqlTaskStatus.getError() + )); + } + pollTaskIdForCompletion(sqlTaskStatus.getTaskId()); + } + private static class TaskStillRunningException extends Exception { From 23a6dfe9fc98a87b2be17ff2b631ad69765fa34f Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Fri, 18 Nov 2022 10:23:28 +0530 Subject: [PATCH 05/10] fixBuildIssues --- .../msq/AbstractITSQLBasedBatchIngestion.java | 2 +- .../testsEx/msq/ITSQLBasedBatchIngestion.java | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java index 564e6f015977..25451ea5e4ed 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java @@ -51,7 +51,7 @@ public class AbstractITSQLBasedBatchIngestion protected String getFileWithFormatFromDir(String dir, String format) throws URISyntaxException { File[] file = (new File(getClass().getResource(dir).toURI())).listFiles(pathname -> { - String name = pathname.getName().toLowerCase(); + String name = pathname.getName(); return name.endsWith(format) && pathname.isFile(); }); return dir + '/' + file[0].getName(); diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java index 9f55624e0b4b..11413f793021 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java @@ -33,31 +33,30 @@ @Category(MultiStageQuery.class) public class ITSQLBasedBatchIngestion extends AbstractITSQLBasedBatchIngestion { - private static final String BATCH_INDEX_TASKS_DIRs = "/multi-stage-query/batch-index/"; + private static final String BATCH_INDEX_TASKS_DIR = "/multi-stage-query/batch-index/"; @Test public void testSQLBasedBatchIngestion() throws Exception { // Get list of all directories in batch-index folder. Each folder is considered a test case. - File[] Directories = (new File(getClass().getResource(BATCH_INDEX_TASKS_DIRs).toURI())).listFiles(File::isDirectory); + File[] directories = (new File(getClass().getResource(BATCH_INDEX_TASKS_DIR).toURI())).listFiles(File::isDirectory); int fail_count = 0; - LOG.info("Test will be run for sql in the following directories - \n %s", Arrays.toString(Directories)); - Directories= new File[]{new File("/Users/abhishekagrawal/abhagraw/druid/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform")}; - for (File dir : Directories) { + LOG.info("Test will be run for sql in the following directories - \n %s", Arrays.toString(directories)); + for (File dir : directories) { try { - runMSQTaskandTestQueries(BATCH_INDEX_TASKS_DIRs + dir.getName(), "msqBatchIndex_" + dir.getName(), + runMSQTaskandTestQueries(BATCH_INDEX_TASKS_DIR + dir.getName(), "msqBatchIndex_" + dir.getName(), ImmutableMap.of("finalizeAggregations", false, "maxNumTasks", 10, "groupByEnableMultiValueUnnesting", false )); } catch (Exception e) { - LOG.error(e, "Error while testing %s", dir.getName()); - fail_count++; + LOG.error(e, "Error while testing %s", dir.getName()); + fail_count++; } } - if (fail_count > 0){ - LOG.error("%s tests were run out of which %s FAILED", Directories.length, fail_count); + if (fail_count > 0) { + LOG.error("%s tests were run out of which %s FAILED", directories.length, fail_count); throw new RuntimeException(); } } From d1aee52f0d57a184960010bb06c24485135df3e3 Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Fri, 18 Nov 2022 11:06:01 +0530 Subject: [PATCH 06/10] fix dependency issues --- .travis.yml | 4 ---- integration-tests-ex/cases/pom.xml | 15 ++++++++++----- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4ca2c93755e5..2d785486ac97 100644 --- a/.travis.yml +++ b/.travis.yml @@ -442,10 +442,6 @@ jobs: docker exec -it druid-$v sh -c 'dmesg | tail -3' ; done - - <<: *integration_batch_index - name: "(Compile=openjdk8, Run=openjdk8) batch index integration test with Indexer" - env: TESTNG_GROUPS='-Dgroups=batch-index' JVM_RUNTIME='-Djvm.runtime=8' USE_INDEXER='indexer' - - &integration_input_format name: "(Compile=openjdk8, Run=openjdk8) input format integration test" stage: Tests - phase 2 diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml index 6cdd7a267302..56dcff203416 100644 --- a/integration-tests-ex/cases/pom.xml +++ b/integration-tests-ex/cases/pom.xml @@ -155,7 +155,7 @@ org.jdbi jdbi - + org.apache.druid.extensions mysql-metadata-storage ${project.parent.version} @@ -218,10 +218,15 @@ JUnitParams test - - javax.ws.rs - jsr311-api - + + javax.ws.rs + jsr311-api + + + org.apache.curator + curator-client + 5.4.0 + From eb2931065a0b3ac481e14641420175157692b646 Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Fri, 18 Nov 2022 19:13:06 +0530 Subject: [PATCH 07/10] Parameterized the test and addressed comments --- .../msq/AbstractITSQLBasedBatchIngestion.java | 69 ++++---- .../testsEx/msq/ITSQLBasedBatchIngestion.java | 49 +++--- .../json_path_index_queries.json | 0 .../{msq_inline => }/msq_inline.sql | 0 .../sparse_column_msq.json | 0 .../sparse_column_msq.sql | 0 .../wikipedia_http_inputsource_msq.sql | 0 .../wikipedia_http_inputsource_queries.json | 0 .../wikipedia_index_msq.sql | 0 .../wikipedia_index_queries.json | 0 ...ikipedia_index_queries_with_transform.json | 0 .../wikipedia_index_task_with_transform.sql | 0 .../wikipedia_index_queries.json | 150 ------------------ .../wikipedia_merge_index_task.sql | 0 .../testing/utils/MsqTestQueryHelper.java | 3 + 15 files changed, 60 insertions(+), 211 deletions(-) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{msq_inline => }/json_path_index_queries.json (100%) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{msq_inline => }/msq_inline.sql (100%) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{sparse_column_msq => }/sparse_column_msq.json (100%) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{sparse_column_msq => }/sparse_column_msq.sql (100%) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{wikipedia_http_inputsource_msq => }/wikipedia_http_inputsource_msq.sql (100%) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{wikipedia_http_inputsource_msq => }/wikipedia_http_inputsource_queries.json (100%) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{wikipedia_local => }/wikipedia_index_msq.sql (100%) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{wikipedia_local => }/wikipedia_index_queries.json (100%) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{wikipedia_with_transform => }/wikipedia_index_queries_with_transform.json (100%) rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{wikipedia_with_transform => }/wikipedia_index_task_with_transform.sql (100%) delete mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_index_queries.json rename integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/{wikipedia_local_with_filter => }/wikipedia_merge_index_task.sql (100%) diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java index 25451ea5e4ed..c2ce7ea204ec 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java @@ -29,10 +29,8 @@ import org.apache.druid.testing.utils.TestQueryHelper; import org.apache.druid.testsEx.indexer.AbstractITBatchIndexTest; -import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -48,56 +46,37 @@ public class AbstractITSQLBasedBatchIngestion @Inject private DataLoaderHelper dataLoaderHelper; - protected String getFileWithFormatFromDir(String dir, String format) throws URISyntaxException + /** + * Reads file as utf-8 string and replace %%DATASOURCE%% with the provide datasource value. + */ + protected String getStringFromFileAndReplaceDatasource(String filePath, String datasource) { - File[] file = (new File(getClass().getResource(dir).toURI())).listFiles(pathname -> { - String name = pathname.getName(); - return name.endsWith(format) && pathname.isFile(); - }); - return dir + '/' + file[0].getName(); - } - - protected String getSqlStringFromDir(String dir, String datasource) throws URISyntaxException - { - String filePath = getFileWithFormatFromDir(dir, ".sql"); - String sqlString; + String fileString; try { InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(filePath); - sqlString = IOUtils.toString(is, StandardCharsets.UTF_8); + fileString = IOUtils.toString(is, StandardCharsets.UTF_8); } catch (IOException e) { throw new ISE(e, "could not read query file: %s", filePath); } - sqlString = StringUtils.replace( - sqlString, + fileString = StringUtils.replace( + fileString, "%%DATASOURCE%%", datasource ); - return sqlString; + return fileString; } - protected void doTestQuery(String dir, String dataSource) throws URISyntaxException + /** + * Reads native queries from a file and runs against the provided datasource. + */ + protected void doTestQuery(String queryFilePath, String dataSource) { - String queryFilePath = getFileWithFormatFromDir(dir, ".json"); try { - String queryResponseTemplate; - try { - InputStream is = AbstractITBatchIndexTest.class.getResourceAsStream(queryFilePath); - queryResponseTemplate = IOUtils.toString(is, StandardCharsets.UTF_8); - } - catch (IOException e) { - throw new ISE(e, "could not read query file: %s", queryFilePath); - } - - queryResponseTemplate = StringUtils.replace( - queryResponseTemplate, - "%%DATASOURCE%%", - dataSource - ); - queryHelper.testQueriesFromString(queryResponseTemplate); - + String query = getStringFromFileAndReplaceDatasource(queryFilePath, dataSource); + queryHelper.testQueriesFromString(query); } catch (Exception e) { LOG.error(e, "Error while running test query"); @@ -105,11 +84,21 @@ protected void doTestQuery(String dir, String dataSource) throws URISyntaxExcept } } - protected void runMSQTaskandTestQueries(String relativePath, String datasource, Map msqContext) throws Exception + /** + * Runs a MSQ ingest sql test. + * + * @param sqlFilePath path of file containing the sql query. + * @param queryFilePath path of file containing the native test queries to be run on the ingested datasource. + * @param datasource name of the datasource. %%DATASOURCE%% in the sql and queries will be replaced with this value. + * @param msqContext context parameters to be passed with MSQ API call. + + */ + protected void runMSQTaskandTestQueries(String sqlFilePath, String queryFilePath, String datasource, + Map msqContext) throws Exception { - LOG.info("Starting MSQ test for [%s]", relativePath); + LOG.info("Starting MSQ test for [%s]", sqlFilePath); - String sqlTask = getSqlStringFromDir(relativePath, datasource); + String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, datasource); LOG.info("SqlTask - \n %s", sqlTask); @@ -120,6 +109,6 @@ protected void runMSQTaskandTestQueries(String relativePath, String datasource, ); dataLoaderHelper.waitUntilDatasourceIsReady(datasource); - doTestQuery(relativePath, datasource); + doTestQuery(queryFilePath, datasource); } } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java index 11413f793021..1a2236146de1 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java @@ -19,6 +19,8 @@ package org.apache.druid.testsEx.msq; +import junitparams.Parameters; +import org.apache.commons.io.FilenameUtils; import org.apache.curator.shaded.com.google.common.collect.ImmutableMap; import org.apache.druid.testsEx.categories.MultiStageQuery; import org.apache.druid.testsEx.config.DruidTestRunner; @@ -26,7 +28,7 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.io.File; +import java.util.List; import java.util.Arrays; @RunWith(DruidTestRunner.class) @@ -35,29 +37,34 @@ public class ITSQLBasedBatchIngestion extends AbstractITSQLBasedBatchIngestion { private static final String BATCH_INDEX_TASKS_DIR = "/multi-stage-query/batch-index/"; + public static List> test_cases() + { + return Arrays.asList( + Arrays.asList("msq_inline.sql", "json_path_index_queries.json"), + Arrays.asList("sparse_column_msq.sql", "sparse_column_msq.json"), + Arrays.asList("wikipedia_http_inputsource_msq.sql", "wikipedia_http_inputsource_queries.json"), + Arrays.asList("wikipedia_index_msq.sql", "wikipedia_index_queries.json"), + Arrays.asList("wikipedia_merge_index_task.sql", "wikipedia_index_queries.json"), + Arrays.asList("wikipedia_index_task_with_transform.sql", "wikipedia_index_queries_with_transform.json") + ); + + } + @Test - public void testSQLBasedBatchIngestion() throws Exception + @Parameters(method = "test_cases") + public void testSQLBasedBatchIngestion(String sqlFileName, String queryFileName) { - // Get list of all directories in batch-index folder. Each folder is considered a test case. - File[] directories = (new File(getClass().getResource(BATCH_INDEX_TASKS_DIR).toURI())).listFiles(File::isDirectory); - int fail_count = 0; - LOG.info("Test will be run for sql in the following directories - \n %s", Arrays.toString(directories)); - for (File dir : directories) { - try { - runMSQTaskandTestQueries(BATCH_INDEX_TASKS_DIR + dir.getName(), "msqBatchIndex_" + dir.getName(), - ImmutableMap.of("finalizeAggregations", false, - "maxNumTasks", 10, - "groupByEnableMultiValueUnnesting", false - )); - } - catch (Exception e) { - LOG.error(e, "Error while testing %s", dir.getName()); - fail_count++; - } + try { + runMSQTaskandTestQueries(BATCH_INDEX_TASKS_DIR + sqlFileName, + BATCH_INDEX_TASKS_DIR + queryFileName, + FilenameUtils.removeExtension(sqlFileName), + ImmutableMap.of("finalizeAggregations", false, + "maxNumTasks", 10, + "groupByEnableMultiValueUnnesting", false + )); } - if (fail_count > 0) { - LOG.error("%s tests were run out of which %s FAILED", directories.length, fail_count); - throw new RuntimeException(); + catch (Exception e) { + LOG.error(e, "Error while testing %s", sqlFileName); } } } diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/json_path_index_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/json_path_index_queries.json rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/json_path_index_queries.json diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/msq_inline.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline/msq_inline.sql rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/msq_inline.sql diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.json rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.json diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq/sparse_column_msq.sql rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/sparse_column_msq.sql diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_msq.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_msq.sql rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq.sql diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_msq/wikipedia_http_inputsource_queries.json rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_http_inputsource_queries.json diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_msq.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_msq.sql rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_msq.sql diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local/wikipedia_index_queries.json rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries.json diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_queries_with_transform.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_queries_with_transform.json rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_queries_with_transform.json diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_task_with_transform.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_with_transform/wikipedia_index_task_with_transform.sql rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_index_task_with_transform.sql diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_index_queries.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_index_queries.json deleted file mode 100644 index 928effe65e97..000000000000 --- a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_index_queries.json +++ /dev/null @@ -1,150 +0,0 @@ -[ - { - "description": "timeseries, 1 agg, all", - "query":{ - "queryType" : "timeBoundary", - "dataSource": "%%DATASOURCE%%" - }, - "expectedResults":[ - { - "timestamp" : "2013-08-31T01:02:33.000Z", - "result" : { - "minTime" : "2013-08-31T01:02:33.000Z", - "maxTime" : "2013-09-01T12:41:27.000Z" - } - } - ] - }, - { - "description": "timeseries, datasketch aggs, all", - "query":{ - "queryType" : "timeseries", - "dataSource": "%%DATASOURCE%%", - "granularity":"day", - "intervals":[ - "2013-08-31T00:00/2013-09-01T00:00" - ], - "filter":null, - "aggregations":[ - { - "type": "HLLSketchMerge", - "name": "approxCountHLL", - "fieldName": "HLLSketchBuild", - "lgK": 12, - "tgtHllType": "HLL_4", - "round": true - }, - { - "type":"thetaSketch", - "name":"approxCountTheta", - "fieldName":"thetaSketch", - "size":16384, - "shouldFinalize":true, - "isInputThetaSketch":false, - "errorBoundsStdDev":null - }, - { - "type":"quantilesDoublesSketch", - "name":"quantilesSketch", - "fieldName":"quantilesDoublesSketch", - "k":128 - } - ] - }, - "expectedResults":[ - { - "timestamp" : "2013-08-31T00:00:00.000Z", - "result" : { - "quantilesSketch":5, - "approxCountTheta":5.0, - "approxCountHLL":5 - } - } - ] - }, - { - "description":"having spec on post aggregation", - "query":{ - "queryType":"groupBy", - "dataSource":"%%DATASOURCE%%", - "granularity":"day", - "dimensions":[ - "page" - ], - "filter":{ - "type":"selector", - "dimension":"language", - "value":"zh" - }, - "aggregations":[ - { - "type":"count", - "name":"rows" - }, - { - "type":"longSum", - "fieldName":"added", - "name":"added_count" - } - ], - "postAggregations": [ - { - "type":"arithmetic", - "name":"added_count_times_ten", - "fn":"*", - "fields":[ - {"type":"fieldAccess", "name":"added_count", "fieldName":"added_count"}, - {"type":"constant", "name":"const", "value":10} - ] - } - ], - "having":{"type":"greaterThan", "aggregation":"added_count_times_ten", "value":9000}, - "intervals":[ - "2013-08-31T00:00/2013-09-01T00:00" - ] - }, - "expectedResults":[ { - "version" : "v1", - "timestamp" : "2013-08-31T00:00:00.000Z", - "event" : { - "added_count_times_ten" : 9050.0, - "page" : "Crimson Typhoon", - "added_count" : 905, - "rows" : 1 - } - } ] - }, - { - "description": "timeseries, stringFirst/stringLast aggs, all", - "query":{ - "queryType" : "timeseries", - "dataSource": "%%DATASOURCE%%", - "granularity":"day", - "intervals":[ - "2013-08-31T00:00/2013-09-01T00:00" - ], - "filter":null, - "aggregations":[ - { - "type": "stringFirst", - "name": "first_user", - "fieldName": "user" - }, - { - "type":"stringLast", - "name":"last_user", - "fieldName":"user" - } - ] - }, - "expectedResults":[ - { - "timestamp" : "2013-08-31T00:00:00.000Z", - "result" : { - "first_user":"nuclear", - "last_user":"stringer" - } - } - ] - } -] \ No newline at end of file diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_merge_index_task.sql b/integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql similarity index 100% rename from integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_local_with_filter/wikipedia_merge_index_task.sql rename to integration-tests-ex/cases/src/test/resources/multi-stage-query/batch-index/wikipedia_merge_index_task.sql diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java index c98f1376339c..580f00a58f2b 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java @@ -259,6 +259,9 @@ public void testQueriesFromFile(String filePath, String fullDatasourcePath) thro } } + /** + * Submits a {@link SqlQuery} to the MSQ API for execution. This method waits for the created task to be completed. + */ public void submitMsqTaskAndWaitForCompletion(String sqlQueryString, Map context) throws Exception { From 0699b1940764955dbad82d29492d5309ca63c2db Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Fri, 18 Nov 2022 19:23:19 +0530 Subject: [PATCH 08/10] Addressing comments --- integration-tests-ex/cases/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests-ex/cases/pom.xml b/integration-tests-ex/cases/pom.xml index 56dcff203416..cf781f6f8885 100644 --- a/integration-tests-ex/cases/pom.xml +++ b/integration-tests-ex/cases/pom.xml @@ -350,7 +350,7 @@ integration-test - + False org.apache.druid.testsEx.categories.${it.category} From 686f00f74d4a4737a129f7b2d169989be5efdc72 Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Fri, 18 Nov 2022 23:19:02 +0530 Subject: [PATCH 09/10] fixing checkstyle errors --- .../org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java index 1a2236146de1..16d370cc87a3 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java @@ -28,8 +28,8 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import java.util.List; import java.util.Arrays; +import java.util.List; @RunWith(DruidTestRunner.class) @Category(MultiStageQuery.class) From 57ec9b687a011521d13a5770e14f95e25493ab23 Mon Sep 17 00:00:00 2001 From: Abhi Agrawal Date: Sat, 19 Nov 2022 11:12:34 +0530 Subject: [PATCH 10/10] Adressing comments --- ....java => AbstractITSQLBasedIngestion.java} | 33 +++++++++++-------- .../testsEx/msq/ITSQLBasedBatchIngestion.java | 7 ++-- .../testing/utils/MsqTestQueryHelper.java | 1 + 3 files changed, 25 insertions(+), 16 deletions(-) rename integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/{AbstractITSQLBasedBatchIngestion.java => AbstractITSQLBasedIngestion.java} (88%) diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java similarity index 88% rename from integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java rename to integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java index c2ce7ea204ec..4bb1cdc4783b 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedBatchIngestion.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/AbstractITSQLBasedIngestion.java @@ -34,7 +34,7 @@ import java.nio.charset.StandardCharsets; import java.util.Map; -public class AbstractITSQLBasedBatchIngestion +public class AbstractITSQLBasedIngestion { public static final Logger LOG = new Logger(TestQueryHelper.class); @Inject @@ -85,21 +85,11 @@ protected void doTestQuery(String queryFilePath, String dataSource) } /** - * Runs a MSQ ingest sql test. - * - * @param sqlFilePath path of file containing the sql query. - * @param queryFilePath path of file containing the native test queries to be run on the ingested datasource. - * @param datasource name of the datasource. %%DATASOURCE%% in the sql and queries will be replaced with this value. - * @param msqContext context parameters to be passed with MSQ API call. - + * Sumits a sqlTask, waits for task completion and then runs test queries on ingested datasource. */ - protected void runMSQTaskandTestQueries(String sqlFilePath, String queryFilePath, String datasource, + protected void submitTaskAnddoTestQuery(String sqlTask, String queryFilePath, String datasource, Map msqContext) throws Exception { - LOG.info("Starting MSQ test for [%s]", sqlFilePath); - - String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, datasource); - LOG.info("SqlTask - \n %s", sqlTask); // Submit the tasks and wait for the datasource to get loaded @@ -111,4 +101,21 @@ protected void runMSQTaskandTestQueries(String sqlFilePath, String queryFilePath dataLoaderHelper.waitUntilDatasourceIsReady(datasource); doTestQuery(queryFilePath, datasource); } + + /** + * Runs a MSQ ingest sql test. + * + * @param sqlFilePath path of file containing the sql query. + * @param queryFilePath path of file containing the native test queries to be run on the ingested datasource. + * @param datasource name of the datasource. %%DATASOURCE%% in the sql and queries will be replaced with this value. + * @param msqContext context parameters to be passed with MSQ API call. + */ + protected void runMSQTaskandTestQueries(String sqlFilePath, String queryFilePath, String datasource, + Map msqContext) throws Exception + { + LOG.info("Starting MSQ test for [%s, %s]", sqlFilePath, queryFilePath); + + String sqlTask = getStringFromFileAndReplaceDatasource(sqlFilePath, datasource); + submitTaskAnddoTestQuery(sqlTask, queryFilePath, datasource, msqContext); + } } diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java index 16d370cc87a3..dbc26d7d4085 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITSQLBasedBatchIngestion.java @@ -33,7 +33,7 @@ @RunWith(DruidTestRunner.class) @Category(MultiStageQuery.class) -public class ITSQLBasedBatchIngestion extends AbstractITSQLBasedBatchIngestion +public class ITSQLBasedBatchIngestion extends AbstractITSQLBasedIngestion { private static final String BATCH_INDEX_TASKS_DIR = "/multi-stage-query/batch-index/"; @@ -59,12 +59,13 @@ public void testSQLBasedBatchIngestion(String sqlFileName, String queryFileName) BATCH_INDEX_TASKS_DIR + queryFileName, FilenameUtils.removeExtension(sqlFileName), ImmutableMap.of("finalizeAggregations", false, - "maxNumTasks", 10, + "maxNumTasks", 5, "groupByEnableMultiValueUnnesting", false )); } catch (Exception e) { - LOG.error(e, "Error while testing %s", sqlFileName); + LOG.error(e, "Error while testing [%s, %s]", sqlFileName, queryFileName); + throw new RuntimeException(e); } } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java index 580f00a58f2b..37647fde87d0 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/MsqTestQueryHelper.java @@ -163,6 +163,7 @@ public TaskState pollTaskIdForCompletion(String taskId) throws Exception throw new TaskStillRunningException(); }, (Throwable t) -> t instanceof TaskStillRunningException, + 99, 100 ); }