From 6e3645863ed526337e6da96640ef1ff50647b2ad Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 12 Jul 2023 12:59:44 +0530 Subject: [PATCH 1/3] Fixing an issue in sequential merge where workers without any partial key statistics would get stuck because controller did not change the worker state. --- .../druid/msq/exec/WorkerSketchFetcher.java | 17 ++++++++++++++ .../msq/exec/WorkerSketchFetcherTest.java | 23 ++++++++----------- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java index c4ebfceac19a..86fc8cec1f58 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java @@ -39,6 +39,7 @@ import org.apache.druid.msq.statistics.CompleteKeyStatisticsInformation; import javax.annotation.Nullable; +import java.util.HashSet; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; @@ -262,11 +263,13 @@ public void sequentialTimeChunkMerging( return; } + final Set noBoundaries = new HashSet<>(tasks); completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().forEach((timeChunk, wks) -> { for (String taskId : tasks) { int workerNumber = MSQTasks.workerFromTaskId(taskId); if (wks.contains(workerNumber)) { + noBoundaries.remove(taskId); executorService.submit(() -> { fetchStatsFromWorker( kernelActions, @@ -285,10 +288,24 @@ public void sequentialTimeChunkMerging( ), retryOperation ); + }); } } }); + + // if the worker did not get any records, update the state of the worker + for (String taskId : noBoundaries) { + kernelActions.accept( + kernel -> { + final int workerNumber = MSQTasks.workerFromTaskId(taskId); + kernel.mergeClusterByStatisticsCollectorForAllTimeChunks( + stageId, + workerNumber, + ClusterByStatisticsSnapshot.empty() + ); + }); + } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java index bcfb692daf0b..592fd089ef4e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java @@ -113,8 +113,7 @@ public void test_submitFetcherTask_parallelFetch() throws InterruptedException latch.countDown(); }, stageDefinition.getId(), ImmutableSet.copyOf(TASK_IDS), ((queryKernel, integer, msqFault) -> {})); - latch.await(5, TimeUnit.SECONDS); - Assert.assertEquals(0, latch.getCount()); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } @@ -122,7 +121,7 @@ public void test_submitFetcherTask_parallelFetch() throws InterruptedException public void test_submitFetcherTask_sequentialFetch() throws InterruptedException { doReturn(true).when(completeKeyStatisticsInformation).isComplete(); - final CountDownLatch latch = new CountDownLatch(TASK_IDS.size() - 1); + final CountDownLatch latch = new CountDownLatch(TASK_IDS.size()); target = spy(new WorkerSketchFetcher(workerClient, workerTaskLauncher, true)); @@ -143,8 +142,8 @@ public void test_submitFetcherTask_sequentialFetch() throws InterruptedException ((queryKernel, integer, msqFault) -> {}) ); - latch.await(5, TimeUnit.SECONDS); - Assert.assertEquals(0, latch.getCount()); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } @Test @@ -186,17 +185,15 @@ public void test_inMemoryRetryEnabled_retryInvoked() throws InterruptedException }) ); - latch.await(5, TimeUnit.SECONDS); - retryLatch.await(5, TimeUnit.SECONDS); - Assert.assertEquals(0, latch.getCount()); - Assert.assertEquals(0, retryLatch.getCount()); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(retryLatch.await(5, TimeUnit.SECONDS)); } @Test public void test_SequentialRetryEnabled_retryInvoked() throws InterruptedException { doReturn(true).when(completeKeyStatisticsInformation).isComplete(); - final CountDownLatch latch = new CountDownLatch(TASK_IDS.size() - 1); + final CountDownLatch latch = new CountDownLatch(TASK_IDS.size()); target = spy(new WorkerSketchFetcher(workerClient, workerTaskLauncher, true)); @@ -217,10 +214,8 @@ public void test_SequentialRetryEnabled_retryInvoked() throws InterruptedExcepti }) ); - latch.await(5, TimeUnit.SECONDS); - retryLatch.await(5, TimeUnit.SECONDS); - Assert.assertEquals(0, latch.getCount()); - Assert.assertEquals(0, retryLatch.getCount()); + Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(retryLatch.await(5, TimeUnit.SECONDS)); } @Test From 844ef9ccce31b9e07798005873811b7e718068a4 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 12 Jul 2023 18:18:01 +0530 Subject: [PATCH 2/3] Removing empty check --- .../druid/msq/exec/WorkerSketchFetcher.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java index 86fc8cec1f58..21b77ee1519d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java @@ -245,24 +245,6 @@ public void sequentialTimeChunkMerging( throw new ISE("All worker partial key information not received for stage[%d]", stageId.getStageNumber()); } - if (completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().isEmpty()) { - // No time chunks at all: skip fetching. - kernelActions.accept( - kernel -> { - for (final String taskId : tasks) { - final int workerNumber = MSQTasks.workerFromTaskId(taskId); - kernel.mergeClusterByStatisticsCollectorForAllTimeChunks( - stageId, - workerNumber, - ClusterByStatisticsSnapshot.empty() - ); - } - } - ); - - return; - } - final Set noBoundaries = new HashSet<>(tasks); completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().forEach((timeChunk, wks) -> { From 72c78814ac348480905d147d13a7e1b15dbcc761 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 12 Jul 2023 20:13:10 +0530 Subject: [PATCH 3/3] Adding IT for MSQ sequential bug fix. --- .../msq/ITKeyStatisticsSketchMergeMode.java | 75 +++++++++++++++++++ ...edia_msq_select_query_sequential_test.json | 15 ++++ 2 files changed, 90 insertions(+) create mode 100644 integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json diff --git a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java index bccde8409957..b0080158905e 100644 --- a/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java +++ b/integration-tests-ex/cases/src/test/java/org/apache/druid/testsEx/msq/ITKeyStatisticsSketchMergeMode.java @@ -190,4 +190,79 @@ public void testMsqIngestionSequentialMerging() throws Exception msqHelper.testQueriesFromFile(QUERY_FILE, datasource); } + + + @Test + public void testMsqIngestionSequentialMergingWithEmptyStatistics() throws Exception + { + String datasource = "dst"; + + // Clear up the datasource from the previous runs + coordinatorClient.unloadSegmentsForDataSource(datasource); + + String queryLocal = + StringUtils.format( + "Replace INTO %s overwrite ALL \n" + + "SELECT\n" + + " TIME_PARSE(\"timestamp\") AS __time,\n" + + " isRobot,\n" + + " diffUrl,\n" + + " added,\n" + + " countryIsoCode,\n" + + " regionName,\n" + + " channel,\n" + + " flags,\n" + + " delta,\n" + + " isUnpatrolled,\n" + + " isNew,\n" + + " deltaBucket,\n" + + " isMinor,\n" + + " isAnonymous,\n" + + " deleted,\n" + + " cityName,\n" + + " metroCode,\n" + + " namespace,\n" + + " comment,\n" + + " page,\n" + + " commentLength,\n" + + " countryName,\n" + + " user,\n" + + " regionIsoCode\n" + + "FROM TABLE(\n" + + " EXTERN(\n" + + " '{\"type\":\"local\",\"files\":[\"/resources/data/batch_index/json/wikipedia_index_data1.json\",\"/resources/data/batch_index/json/wikipedia_index_data2.json\"]}',\n" + + " '{\"type\":\"json\"}',\n" + + " '[{\"type\":\"string\",\"name\":\"timestamp\"},{\"type\":\"string\",\"name\":\"isRobot\"},{\"type\":\"string\",\"name\":\"diffUrl\"},{\"type\":\"long\",\"name\":\"added\"},{\"type\":\"string\",\"name\":\"countryIsoCode\"},{\"type\":\"string\",\"name\":\"regionName\"},{\"type\":\"string\",\"name\":\"channel\"},{\"type\":\"string\",\"name\":\"flags\"},{\"type\":\"long\",\"name\":\"delta\"},{\"type\":\"string\",\"name\":\"isUnpatrolled\"},{\"type\":\"string\",\"name\":\"isNew\"},{\"type\":\"double\",\"name\":\"deltaBucket\"},{\"type\":\"string\",\"name\":\"isMinor\"},{\"type\":\"string\",\"name\":\"isAnonymous\"},{\"type\":\"long\",\"name\":\"deleted\"},{\"type\":\"string\",\"name\":\"cityName\"},{\"type\":\"long\",\"name\":\"metroCode\"},{\"type\":\"string\",\"name\":\"namespace\"},{\"type\":\"string\",\"name\":\"comment\"},{\"type\":\"string\",\"name\":\"page\"},{\"type\":\"long\",\"name\":\"commentLength\"},{\"type\":\"string\",\"name\":\"countryName\"},{\"type\":\"string\",\"name\":\"user\"},{\"type\":\"string\",\"name\":\"regionIsoCode\"}]'\n" + + " )\n" + + ")\n" + + "where delta=111 " + // we add this filter since delta=111 is only present in wikipedia_index_data1.json. This means partitions from worker 2 will be empty. + + "PARTITIONED BY DAY\n" + + "CLUSTERED BY \"__time\"", + datasource + ); + + ImmutableMap context = ImmutableMap.of( + MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE, + ClusterStatisticsMergeMode.SEQUENTIAL, + MultiStageQueryContext.CTX_MAX_NUM_TASKS, + 3 + ); + + // Submit the task and wait for the datasource to get loaded + SqlQuery sqlQuery = new SqlQuery(queryLocal, null, false, false, false, context, null); + SqlTaskStatus sqlTaskStatus = msqHelper.submitMsqTask(sqlQuery); + + if (sqlTaskStatus.getState().isFailure()) { + Assert.fail(StringUtils.format( + "Unable to start the task successfully.\nPossible exception: %s", + sqlTaskStatus.getError() + )); + } + + msqHelper.pollTaskIdForSuccess(sqlTaskStatus.getTaskId()); + dataLoaderHelper.waitUntilDatasourceIsReady(datasource); + + msqHelper.testQueriesFromFile("/multi-stage-query/wikipedia_msq_select_query_sequential_test.json", datasource); + } } diff --git a/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json new file mode 100644 index 000000000000..c50ea09ad26a --- /dev/null +++ b/integration-tests-ex/cases/src/test/resources/multi-stage-query/wikipedia_msq_select_query_sequential_test.json @@ -0,0 +1,15 @@ +[ + { + "query": "SELECT __time, isRobot, added, delta, deleted, namespace FROM %%DATASOURCE%%", + "expectedResults": [ + { + "__time": 1377933081000, + "isRobot": "", + "added": 123, + "delta": 111, + "deleted": 12, + "namespace":"article" + } + ] + } +] \ No newline at end of file