From 5218caa902cc14038456709d3c541b5c6b2eab7e Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 30 Nov 2022 17:56:14 +0530 Subject: [PATCH 01/10] Add validation checks to worker chat handler apis --- .../org/apache/druid/msq/exec/WorkerImpl.java | 12 +++++++ .../druid/msq/indexing/WorkerChatHandler.java | 31 ++++++++++++++----- .../ClusterByStatisticsSnapshot.java | 4 +++ 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 49d6f9080d7c..2cf26ac3e3bf 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -571,12 +571,24 @@ public void postFinish() @Override public ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId) { + if (stageKernelMap.get(stageId) == null) { + throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); + } + if (!WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(stageKernelMap.get(stageId).getPhase())) { + throw new ISE("Requested statistics snapshot in unexpected worker phase %s", stageKernelMap.get(stageId).getPhase()); + } return stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot(); } @Override public ClusterByStatisticsSnapshot fetchStatisticsSnapshotForTimeChunk(StageId stageId, long timeChunk) { + if (stageKernelMap.get(stageId) == null) { + throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); + } + if (!WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(stageKernelMap.get(stageId).getPhase())) { + throw new ISE("Requested statistics snapshot in unexpected worker phase %s", stageKernelMap.get(stageId).getPhase()); + } ClusterByStatisticsSnapshot snapshot = stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot(); return snapshot.getSnapshotForTimeChunk(timeChunk); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java index dd6ea7cb7124..e951eaccecb5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java @@ -24,6 +24,7 @@ import org.apache.druid.frame.file.FrameFileHttpResponseHandler; import org.apache.druid.frame.key.ClusterByPartitions; import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.kernel.StageId; @@ -193,10 +194,17 @@ public Response httpFetchKeyStatistics( ChatHandlers.authorizationCheck(req, Action.READ, task.getDataSource(), toolbox.getAuthorizerMapper()); ClusterByStatisticsSnapshot clusterByStatisticsSnapshot; StageId stageId = new StageId(queryId, stageNumber); - clusterByStatisticsSnapshot = worker.fetchStatisticsSnapshot(stageId); - return Response.status(Response.Status.ACCEPTED) - .entity(clusterByStatisticsSnapshot) - .build(); + try { + clusterByStatisticsSnapshot = worker.fetchStatisticsSnapshot(stageId); + return Response.status(Response.Status.ACCEPTED) + .entity(clusterByStatisticsSnapshot) + .build(); + } + catch (ISE e) { + log.error(e, "Invalid request for key statistics"); + return Response.status(Response.Status.BAD_REQUEST) + .build(); + } } @POST @@ -213,10 +221,17 @@ public Response httpSketch( ChatHandlers.authorizationCheck(req, Action.READ, task.getDataSource(), toolbox.getAuthorizerMapper()); ClusterByStatisticsSnapshot snapshotForTimeChunk; StageId stageId = new StageId(queryId, stageNumber); - snapshotForTimeChunk = worker.fetchStatisticsSnapshotForTimeChunk(stageId, timeChunk); - return Response.status(Response.Status.ACCEPTED) - .entity(snapshotForTimeChunk) - .build(); + try { + snapshotForTimeChunk = worker.fetchStatisticsSnapshotForTimeChunk(stageId, timeChunk); + return Response.status(Response.Status.ACCEPTED) + .entity(snapshotForTimeChunk) + .build(); + } + catch (ISE e) { + log.error(e, "Invalid request for key statistics for time chunk"); + return Response.status(Response.Status.BAD_REQUEST) + .build(); + } } /** diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsSnapshot.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsSnapshot.java index e54253ad2186..16a4c1656b06 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsSnapshot.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/statistics/ClusterByStatisticsSnapshot.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import org.apache.druid.frame.key.RowKey; +import org.apache.druid.java.util.common.ISE; import javax.annotation.Nullable; import java.util.Collections; @@ -61,6 +62,9 @@ Map getBuckets() public ClusterByStatisticsSnapshot getSnapshotForTimeChunk(long timeChunk) { Bucket bucket = buckets.get(timeChunk); + if (bucket == null) { + throw new ISE("ClusterByStatistics not present for requested timechunk %s", timeChunk); + } return new ClusterByStatisticsSnapshot(ImmutableMap.of(timeChunk, bucket), null); } From 61608265f9b95b995fce9c9662b2bba8c535d1cb Mon Sep 17 00:00:00 2001 From: cryptoe Date: Fri, 9 Dec 2022 20:31:50 +0530 Subject: [PATCH 02/10] Merge things and polishing the error messages. --- .../druid/msq/indexing/WorkerChatHandler.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java index e951eaccecb5..000ae6b38bfc 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java @@ -19,12 +19,13 @@ package org.apache.druid.msq.indexing; +import com.google.common.collect.ImmutableMap; import it.unimi.dsi.fastutil.bytes.ByteArrays; import org.apache.commons.lang.mutable.MutableLong; import org.apache.druid.frame.file.FrameFileHttpResponseHandler; import org.apache.druid.frame.key.ClusterByPartitions; import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.exec.Worker; import org.apache.druid.msq.kernel.StageId; @@ -72,7 +73,7 @@ public WorkerChatHandler(TaskToolbox toolbox, Worker worker) /** * Returns up to {@link #CHANNEL_DATA_CHUNK_SIZE} bytes of stage output data. - * + *

* See {@link org.apache.druid.msq.exec.WorkerClient#fetchChannelData} for the client-side code that calls this API. */ @GET @@ -200,9 +201,15 @@ public Response httpFetchKeyStatistics( .entity(clusterByStatisticsSnapshot) .build(); } - catch (ISE e) { - log.error(e, "Invalid request for key statistics"); + catch (Exception e) { + String errorMessage = StringUtils.format( + "Invalid request for key statistics for query[%s] and stage[%d]", + queryId, + stageNumber + ); + log.error(e, errorMessage); return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", errorMessage)) .build(); } } @@ -227,9 +234,16 @@ public Response httpSketch( .entity(snapshotForTimeChunk) .build(); } - catch (ISE e) { - log.error(e, "Invalid request for key statistics for time chunk"); + catch (Exception e) { + String errorMessage = StringUtils.format( + "Invalid request for key statistics for query[%s] and stage[%d] and timeChunk[%d]", + queryId, + stageNumber, + timeChunk + ); + log.error(e, errorMessage); return Response.status(Response.Status.BAD_REQUEST) + .entity(ImmutableMap.of("error", errorMessage)) .build(); } } From c3b535849b27d0e38df23f7886a01fbba153412d Mon Sep 17 00:00:00 2001 From: cryptoe Date: Fri, 9 Dec 2022 20:33:06 +0530 Subject: [PATCH 03/10] Minor error message change --- .../java/org/apache/druid/msq/indexing/WorkerChatHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java index 000ae6b38bfc..e2dda9308611 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java @@ -236,7 +236,7 @@ public Response httpSketch( } catch (Exception e) { String errorMessage = StringUtils.format( - "Invalid request for key statistics for query[%s] and stage[%d] and timeChunk[%d]", + "Invalid request for key statistics for query[%s], stage[%d] and timeChunk[%d]", queryId, stageNumber, timeChunk From 1e15f19ff8017bdc6a4cc9782017706608a324e5 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Mon, 12 Dec 2022 18:31:33 +0530 Subject: [PATCH 04/10] Fixing race and adding some tests --- .../org/apache/druid/msq/exec/WorkerImpl.java | 62 ++++- .../druid/msq/indexing/WorkerChatHandler.java | 2 +- .../msq/indexing/WorkerChatHandlerTest.java | 254 ++++++++++++++++++ 3 files changed, 306 insertions(+), 12 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 2cf26ac3e3bf..555a2ff0ea95 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.SettableFuture; import it.unimi.dsi.fastutil.bytes.ByteArrays; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; import org.apache.druid.frame.channel.BlockingQueueFrameChannel; import org.apache.druid.frame.channel.ReadableFileFrameChannel; @@ -571,28 +572,67 @@ public void postFinish() @Override public ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId) { - if (stageKernelMap.get(stageId) == null) { - throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); + SettableFuture statFuture = SettableFuture.create(); + kernelManipulationQueue.add(holder -> { + try { + if (stageKernelMap.get(stageId) == null) { + throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); + } + if (!WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(stageKernelMap.get(stageId) + .getPhase())) { + throw new ISE( + "Requested statistics snapshot in unexpected worker phase %s", + stageKernelMap.get(stageId).getPhase() + ); + } + statFuture.set(stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot()); + } + catch (Exception e) { + statFuture.setException(e); + } + }); + + try { + return FutureUtils.get(statFuture, true); } - if (!WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(stageKernelMap.get(stageId).getPhase())) { - throw new ISE("Requested statistics snapshot in unexpected worker phase %s", stageKernelMap.get(stageId).getPhase()); + catch (Exception e) { + throw new RuntimeException(e); } - return stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot(); } @Override public ClusterByStatisticsSnapshot fetchStatisticsSnapshotForTimeChunk(StageId stageId, long timeChunk) { - if (stageKernelMap.get(stageId) == null) { - throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); + SettableFuture statFuture = SettableFuture.create(); + kernelManipulationQueue.add(holder -> { + try { + if (stageKernelMap.get(stageId) == null) { + throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); + } + if (!WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(stageKernelMap.get(stageId) + .getPhase())) { + throw new ISE( + "Requested statistics snapshot in unexpected worker phase %s", + stageKernelMap.get(stageId).getPhase() + ); + } + statFuture.set(stageKernelMap.get(stageId) + .getResultKeyStatisticsSnapshot() + .getSnapshotForTimeChunk(timeChunk)); + } + catch (Exception e) { + statFuture.setException(e); + } + }); + try { + return FutureUtils.get(statFuture, true); } - if (!WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(stageKernelMap.get(stageId).getPhase())) { - throw new ISE("Requested statistics snapshot in unexpected worker phase %s", stageKernelMap.get(stageId).getPhase()); + catch (Exception e) { + throw new RuntimeException(e); } - ClusterByStatisticsSnapshot snapshot = stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot(); - return snapshot.getSnapshotForTimeChunk(timeChunk); } + @Override public CounterSnapshotsTree getCounters() { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java index e2dda9308611..3eae3b05ccf4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerChatHandler.java @@ -218,7 +218,7 @@ public Response httpFetchKeyStatistics( @Path("/keyStatisticsForTimeChunk/{queryId}/{stageNumber}/{timeChunk}") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) - public Response httpSketch( + public Response httpFetchKeyStatisticsWithSnapshot( @PathParam("queryId") final String queryId, @PathParam("stageNumber") final int stageNumber, @PathParam("timeChunk") final long timeChunk, diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java new file mode 100644 index 000000000000..e83fb34a9ca3 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.indexing; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.frame.key.ClusterByPartitions; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskReport; +import org.apache.druid.indexing.common.TaskReportFileWriter; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.msq.counters.CounterSnapshotsTree; +import org.apache.druid.msq.exec.Worker; +import org.apache.druid.msq.kernel.StageId; +import org.apache.druid.msq.kernel.WorkOrder; +import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.sql.calcite.util.CalciteTests; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +import javax.annotation.Nullable; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.Response; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +public class WorkerChatHandlerTest +{ + private static final StageId TEST_STAGE = new StageId("123", 0); + @Mock + private HttpServletRequest req; + + private TaskToolbox toolbox; + private AutoCloseable mocks; + + private final TestWorker worker = new TestWorker(); + + @Before + public void setUp() + { + ObjectMapper mapper = new DefaultObjectMapper(); + IndexIO indexIO = new IndexIO(mapper, () -> 0); + IndexMergerV9 indexMerger = new IndexMergerV9( + mapper, + indexIO, + OffHeapMemorySegmentWriteOutMediumFactory.instance() + ); + + mocks = MockitoAnnotations.openMocks(this); + Mockito.when(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .thenReturn(new AuthenticationResult("druid", "druid", null, null)); + TaskToolbox.Builder builder = new TaskToolbox.Builder(); + toolbox = builder.authorizerMapper(CalciteTests.TEST_AUTHORIZER_MAPPER) + .indexIO(indexIO) + .indexMergerV9(indexMerger) + .taskReportFileWriter( + new TaskReportFileWriter() + { + @Override + public void write(String taskId, Map reports) + { + + } + + @Override + public void setObjectMapper(ObjectMapper objectMapper) + { + + } + } + ) + .build(); + } + + @Test + public void testFetchSnapshot() + { + WorkerChatHandler chatHandler = new WorkerChatHandler(toolbox, worker); + Assert.assertEquals( + ClusterByStatisticsSnapshot.empty(), + chatHandler.httpFetchKeyStatistics(TEST_STAGE.getQueryId(), TEST_STAGE.getStageNumber(), req) + .getEntity() + ); + } + + @Test + public void testFetchSnapshot404() + { + WorkerChatHandler chatHandler = new WorkerChatHandler(toolbox, worker); + Assert.assertEquals( + Response.Status.BAD_REQUEST.getStatusCode(), + chatHandler.httpFetchKeyStatistics("123", 2, req) + .getStatus() + ); + } + + @Test + public void testFetchSnapshotWithTimeChunk() + { + WorkerChatHandler chatHandler = new WorkerChatHandler(toolbox, worker); + Assert.assertEquals( + ClusterByStatisticsSnapshot.empty(), + chatHandler.httpFetchKeyStatisticsWithSnapshot(TEST_STAGE.getQueryId(), TEST_STAGE.getStageNumber(), 1, req) + .getEntity() + ); + } + + @Test + public void testFetchSnapshotWithTimeChunk404() + { + WorkerChatHandler chatHandler = new WorkerChatHandler(toolbox, worker); + Assert.assertEquals( + Response.Status.BAD_REQUEST.getStatusCode(), + chatHandler.httpFetchKeyStatisticsWithSnapshot("123", 2, 1, req) + .getStatus() + ); + } + + + private static class TestWorker implements Worker + { + + @Override + public String id() + { + return TEST_STAGE.getQueryId() + "task"; + } + + @Override + public MSQWorkerTask task() + { + return new MSQWorkerTask("controller", "ds", 1, new HashMap<>()); + } + + @Override + public TaskStatus run() throws Exception + { + return null; + } + + @Override + public void stopGracefully() + { + + } + + @Override + public void controllerFailed() + { + + } + + @Override + public void postWorkOrder(WorkOrder workOrder) + { + + } + + @Override + public ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId) + { + if (TEST_STAGE.equals(stageId)) { + return ClusterByStatisticsSnapshot.empty(); + } else { + throw new ISE("stage not found %s", stageId); + } + } + + @Override + public ClusterByStatisticsSnapshot fetchStatisticsSnapshotForTimeChunk(StageId stageId, long timeChunk) + { + if (TEST_STAGE.equals(stageId)) { + return ClusterByStatisticsSnapshot.empty(); + } else { + throw new ISE("stage not found %s", stageId); + } + } + + @Override + public boolean postResultPartitionBoundaries( + ClusterByPartitions stagePartitionBoundaries, + String queryId, + int stageNumber + ) + { + return false; + } + + @Nullable + @Override + public InputStream readChannel(String queryId, int stageNumber, int partitionNumber, long offset) + { + return null; + } + + @Override + public CounterSnapshotsTree getCounters() + { + return null; + } + + @Override + public void postCleanupStage(StageId stageId) + { + + } + + @Override + public void postFinish() + { + + } + } + + @After + public void tearDown() + { + try { + mocks.close(); + } + catch (Exception ignored) { + // ignore tear down exceptions + } + } +} From ba09c21a4e85914d80bba20858b8fbe0cac444fa Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 13 Dec 2022 16:58:53 +0530 Subject: [PATCH 05/10] Fixing controller fetching stats from wrong workers. Fixing race Changing default mode to Parallel Adding logging. Fixing exceptions not propagated properly. --- docs/multi-stage-query/reference.md | 2 +- .../apache/druid/msq/exec/ControllerImpl.java | 7 +- .../exec/ExceptionWrappingWorkerClient.java | 14 +++- .../org/apache/druid/msq/exec/WorkerImpl.java | 71 ++++++------------- .../druid/msq/exec/WorkerSketchFetcher.java | 37 ++++++++-- .../msq/kernel/worker/WorkerStageKernel.java | 3 +- .../msq/util/MultiStageQueryContext.java | 2 +- .../msq/exec/WorkerSketchFetcherTest.java | 1 + .../msq/indexing/WorkerChatHandlerTest.java | 2 +- 9 files changed, 75 insertions(+), 64 deletions(-) diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md index 8ea9adf61ae6..5016b6ab4849 100644 --- a/docs/multi-stage-query/reference.md +++ b/docs/multi-stage-query/reference.md @@ -325,7 +325,7 @@ The following table lists the context parameters for the MSQ task engine: | `maxParseExceptions`| SELECT, INSERT, REPLACE

Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1.| 0 | | `rowsPerSegment` | INSERT or REPLACE

The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 | | `indexSpec` | INSERT or REPLACE

An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). | -| `clusterStatisticsMergeMode` | Whether to use parallel or sequential mode for merging of the worker sketches. Can be `PARALLEL`, `SEQUENTIAL` or `AUTO`. See [Sketch Merging Mode](#sketch-merging-mode) for more information. | `AUTO` | +| `clusterStatisticsMergeMode` | Whether to use parallel or sequential mode for merging of the worker sketches. Can be `PARALLEL`, `SEQUENTIAL` or `AUTO`. See [Sketch Merging Mode](#sketch-merging-mode) for more information. | `PARALLEL` | ## Sketch Merging Mode This section details the advantages and performance of various Cluster By Statistics Merge Modes. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 318c33a759c7..3ebab8b6fe0d 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -631,7 +631,12 @@ public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNum clusterByPartitionsCompletableFuture.whenComplete((clusterByPartitionsEither, throwable) -> { addToKernelManipulationQueue(holder -> { if (throwable != null) { - holder.failStageForReason(stageId, UnknownFault.forException(throwable)); + log.error("Error while fetching stats for stageId[%s]", stageId); + if (throwable instanceof MSQException) { + holder.failStageForReason(stageId, ((MSQException) throwable).getFault()); + } else { + holder.failStageForReason(stageId, UnknownFault.forException(throwable)); + } } else if (clusterByPartitionsEither.isError()) { holder.failStageForReason(stageId, new TooManyPartitionsFault(stageDef.getMaxPartitionCount())); } else { diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExceptionWrappingWorkerClient.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExceptionWrappingWorkerClient.java index 3d78b7c9ced4..eb6b1af529e5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExceptionWrappingWorkerClient.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ExceptionWrappingWorkerClient.java @@ -57,9 +57,13 @@ public ListenableFuture postWorkOrder(String workerTaskId, WorkOrder workO } @Override - public ListenableFuture fetchClusterByStatisticsSnapshot(String workerTaskId, String queryId, int stageNumber) + public ListenableFuture fetchClusterByStatisticsSnapshot( + String workerTaskId, + String queryId, + int stageNumber + ) { - return client.fetchClusterByStatisticsSnapshot(workerTaskId, queryId, stageNumber); + return wrap(workerTaskId, client, c -> c.fetchClusterByStatisticsSnapshot(workerTaskId, queryId, stageNumber)); } @Override @@ -70,7 +74,11 @@ public ListenableFuture fetchClusterByStatisticsSna long timeChunk ) { - return client.fetchClusterByStatisticsSnapshotForTimeChunk(workerTaskId, queryId, stageNumber, timeChunk); + return wrap( + workerTaskId, + client, + c -> c.fetchClusterByStatisticsSnapshotForTimeChunk(workerTaskId, queryId, stageNumber, timeChunk) + ); } @Override diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 555a2ff0ea95..c560d690e901 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -30,7 +30,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.SettableFuture; import it.unimi.dsi.fastutil.bytes.ByteArrays; -import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.frame.allocation.ArenaMemoryAllocator; import org.apache.druid.frame.channel.BlockingQueueFrameChannel; import org.apache.druid.frame.channel.ReadableFileFrameChannel; @@ -572,64 +571,34 @@ public void postFinish() @Override public ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId) { - SettableFuture statFuture = SettableFuture.create(); - kernelManipulationQueue.add(holder -> { - try { - if (stageKernelMap.get(stageId) == null) { - throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); - } - if (!WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(stageKernelMap.get(stageId) - .getPhase())) { - throw new ISE( - "Requested statistics snapshot in unexpected worker phase %s", - stageKernelMap.get(stageId).getPhase() - ); - } - statFuture.set(stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot()); - } - catch (Exception e) { - statFuture.setException(e); - } - }); - - try { - return FutureUtils.get(statFuture, true); + if (stageKernelMap.get(stageId) == null) { + throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); } - catch (Exception e) { - throw new RuntimeException(e); + if (stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot() == null) { + throw new ISE( + "Requested statistics snapshot is not generated yet for stageId[%s]", + stageId + ); } + return stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot(); } @Override public ClusterByStatisticsSnapshot fetchStatisticsSnapshotForTimeChunk(StageId stageId, long timeChunk) { - SettableFuture statFuture = SettableFuture.create(); - kernelManipulationQueue.add(holder -> { - try { - if (stageKernelMap.get(stageId) == null) { - throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); - } - if (!WorkerStagePhase.PRESHUFFLE_WAITING_FOR_RESULT_PARTITION_BOUNDARIES.equals(stageKernelMap.get(stageId) - .getPhase())) { - throw new ISE( - "Requested statistics snapshot in unexpected worker phase %s", - stageKernelMap.get(stageId).getPhase() - ); - } - statFuture.set(stageKernelMap.get(stageId) - .getResultKeyStatisticsSnapshot() - .getSnapshotForTimeChunk(timeChunk)); - } - catch (Exception e) { - statFuture.setException(e); - } - }); - try { - return FutureUtils.get(statFuture, true); + if (stageKernelMap.get(stageId) == null) { + throw new ISE("Requested statistics snapshot for non-existent stageId[%s].", stageId); } - catch (Exception e) { - throw new RuntimeException(e); + if (stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot() == null) { + throw new ISE( + "Requested statistics snapshot is not generated yet for stageId[%s]", + stageId + ); } + return stageKernelMap.get(stageId) + .getResultKeyStatisticsSnapshot() + .getSnapshotForTimeChunk(timeChunk); + } @@ -695,7 +664,7 @@ private OutputChannelFactory makeStageOutputChannelFactory(final FrameContext fr /** * Decorates the server-wide {@link QueryProcessingPool} such that any Callables and Runnables, not just * {@link PrioritizedCallable} and {@link PrioritizedRunnable}, may be added to it. - * + *

* In production, the underlying {@link QueryProcessingPool} pool is set up by * {@link org.apache.druid.guice.DruidProcessingModule}. */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java index dc6f21990587..fdbdcd71cfd3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java @@ -90,14 +90,27 @@ public CompletableFuture> submitFetcherTask( return inMemoryFullSketchMerging(stageDefinition, workerTaskIds); case AUTO: if (clusterBy.getBucketByCount() == 0) { - log.info("Query [%s] AUTO mode: chose PARALLEL mode to merge key statistics", stageDefinition.getId().getQueryId()); + log.info( + "Query[%s] stage[%d] for AUTO mode: chose PARALLEL mode to merge key statistics", + stageDefinition.getId().getQueryId(), + stageDefinition.getStageNumber() + ); // If there is no time clustering, there is no scope for sequential merge return inMemoryFullSketchMerging(stageDefinition, workerTaskIds); - } else if (stageDefinition.getMaxWorkerCount() > WORKER_THRESHOLD || completeKeyStatisticsInformation.getBytesRetained() > BYTES_THRESHOLD) { - log.info("Query [%s] AUTO mode: chose SEQUENTIAL mode to merge key statistics", stageDefinition.getId().getQueryId()); + } else if (stageDefinition.getMaxWorkerCount() > WORKER_THRESHOLD + || completeKeyStatisticsInformation.getBytesRetained() > BYTES_THRESHOLD) { + log.info( + "Query[%s] stage[%d] for AUTO mode: chose SEQUENTIAL mode to merge key statistics", + stageDefinition.getId().getQueryId(), + stageDefinition.getStageNumber() + ); return sequentialTimeChunkMerging(completeKeyStatisticsInformation, stageDefinition, workerTaskIds); } - log.info("Query [%s] AUTO mode: chose PARALLEL mode to merge key statistics", stageDefinition.getId().getQueryId()); + log.info( + "Query[%s] stage[%d] for AUTO mode: chose PARALLEL mode to merge key statistics", + stageDefinition.getId().getQueryId(), + stageDefinition.getStageNumber() + ); return inMemoryFullSketchMerging(stageDefinition, workerTaskIds); default: throw new IllegalStateException("No fetching strategy found for mode: " + clusterStatisticsMergeMode); @@ -123,8 +136,15 @@ CompletableFuture> inMemoryFullSketchMerging( // Guarded by synchronized mergedStatisticsCollector final Set finishedWorkers = new HashSet<>(); + log.info( + "Fetching stats using %s for stage[%d] for tasks[%s]", + ClusterStatisticsMergeMode.PARALLEL, + stageDefinition.getStageNumber(), + String.join("", workerTaskIds) + ); + // Submit a task for each worker to fetch statistics - IntStream.range(0, workerCount).forEach(workerNo -> { + IntStream.range(0, stageDefinition.getMaxWorkerCount()).forEach(workerNo -> { executorService.submit(() -> { ListenableFuture snapshotFuture = workerClient.fetchClusterByStatisticsSnapshot( @@ -177,6 +197,13 @@ CompletableFuture> sequentialTimeChunkMerging( workerTaskIds, completeKeyStatisticsInformation.getTimeSegmentVsWorkerMap().entrySet().iterator() ); + + log.info( + "Fetching stats using %s for stage[%d] for tasks[%s]", + ClusterStatisticsMergeMode.SEQUENTIAL, + stageDefinition.getStageNumber(), + String.join("", workerTaskIds) + ); sequentialFetchStage.submitFetchingTasksForNextTimeChunk(); return sequentialFetchStage.getPartitionFuture(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java index b0ed8e5c19da..00a49656be48 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java @@ -48,8 +48,9 @@ public class WorkerStageKernel private WorkerStagePhase phase = WorkerStagePhase.NEW; + // We read this variable in the main thread and the netty threads @Nullable - private ClusterByStatisticsSnapshot resultKeyStatisticsSnapshot; + private volatile ClusterByStatisticsSnapshot resultKeyStatisticsSnapshot; @Nullable private ClusterByPartitions resultPartitionBoundaries; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java index 7c589f2326f1..3dc622870f43 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java @@ -60,7 +60,7 @@ public class MultiStageQueryContext public static final String CTX_ENABLE_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage"; public static final String CTX_CLUSTER_STATISTICS_MERGE_MODE = "clusterStatisticsMergeMode"; - public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.AUTO.toString(); + public static final String DEFAULT_CLUSTER_STATISTICS_MERGE_MODE = ClusterStatisticsMergeMode.PARALLEL.toString(); private static final boolean DEFAULT_ENABLE_DURABLE_SHUFFLE_STORAGE = false; public static final String CTX_DESTINATION = "destination"; diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java index 83fb73043bd9..489d72106fc5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java @@ -117,6 +117,7 @@ public void test_submitFetcherTask_parallelFetch_mergePerformedCorrectly() final Queue snapshotQueue = new ConcurrentLinkedQueue<>(); final List workerIds = ImmutableList.of("0", "1", "2", "3", "4"); final CountDownLatch latch = new CountDownLatch(workerIds.size()); + doReturn(workerIds.size()).when(stageDefinition).getMaxWorkerCount(); target = spy(new WorkerSketchFetcher(workerClient, ClusterStatisticsMergeMode.PARALLEL, 300_000_000)); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java index e83fb34a9ca3..5b9d6e497aa0 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/WorkerChatHandlerTest.java @@ -162,7 +162,7 @@ public MSQWorkerTask task() } @Override - public TaskStatus run() throws Exception + public TaskStatus run() { return null; } From 47d0d37643c50ff58d27c156d0114f314f2cc168 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 13 Dec 2022 18:22:45 +0530 Subject: [PATCH 06/10] Changing to kernel worker count --- .../java/org/apache/druid/msq/exec/ControllerImpl.java | 4 +++- .../org/apache/druid/msq/exec/WorkerSketchFetcher.java | 2 +- .../msq/kernel/controller/ControllerQueryKernel.java | 8 ++++++++ .../msq/kernel/controller/ControllerStageTracker.java | 8 ++++++++ 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 3ebab8b6fe0d..81dff3dacc5e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -615,7 +615,9 @@ public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNum queryKernel.addPartialKeyStatisticsForStageAndWorker(stageId, workerNumber, partialKeyStatisticsInformation); if (queryKernel.getStagePhase(stageId).equals(ControllerStagePhase.MERGING_STATISTICS)) { - List workerTaskIds = workerTaskLauncher.getTaskList(); + // we only need tasks which are active for this stage. + List workerTaskIds = workerTaskLauncher.getTaskList() + .subList(0, queryKernel.getWorkerCountForStage(stageId)); CompleteKeyStatisticsInformation completeKeyStatisticsInformation = queryKernel.getCompleteKeyStatisticsInformation(stageId); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java index fdbdcd71cfd3..534ed8a7d567 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java @@ -144,7 +144,7 @@ CompletableFuture> inMemoryFullSketchMerging( ); // Submit a task for each worker to fetch statistics - IntStream.range(0, stageDefinition.getMaxWorkerCount()).forEach(workerNo -> { + IntStream.range(0, workerCount).forEach(workerNo -> { executorService.submit(() -> { ListenableFuture snapshotFuture = workerClient.fetchClusterByStatisticsSnapshot( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index d2e9f198f04b..97bfca8b721a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -444,6 +444,14 @@ public void setResultsCompleteForStageAndWorker( } } + /** + * Delegates call to {@link ControllerStageTracker#getWorkerCount()} + */ + public int getWorkerCountForStage(final StageId stageId) + { + return getStageKernelOrThrow(stageId).getWorkerCount(); + } + /** * Delegates call to {@link ControllerStageTracker#getFailureReason()} */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java index bf53239335c5..9f13196d651a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java @@ -421,6 +421,14 @@ private void generateResultPartitionsAndBoundariesWithoutKeyStatistics() } } + /** + * Returns the worker count for this stage. + */ + public int getWorkerCount() + { + return workerCount; + } + /** * Marks the stage as failed and sets the reason for the same. * From a2939599d9b15391ec294a095e14b9ba257c0ad3 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 13 Dec 2022 22:03:23 +0530 Subject: [PATCH 07/10] Added a better logic to figure out assigned worker for a stage. --- .../apache/druid/msq/exec/ControllerImpl.java | 11 ++++-- .../druid/msq/exec/WorkerSketchFetcher.java | 15 ++++---- .../controller/ControllerQueryKernel.java | 8 ---- .../controller/ControllerStageTracker.java | 8 ---- .../exec/WorkerSketchFetcherAutoModeTest.java | 38 ++++++++++++++----- .../msq/exec/WorkerSketchFetcherTest.java | 8 ++-- 6 files changed, 50 insertions(+), 38 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 81dff3dacc5e..cecd9faf562c 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -263,6 +263,10 @@ public class ControllerImpl implements Controller // For live reports. Written by the main controller thread, read by HTTP threads. private final ConcurrentHashMap stagePartitionCountsForLiveReports = new ConcurrentHashMap<>(); + // Stage number -> set of immutable workers assigned for that stage + // Always accessed by the main controller thread. + private final Map> stageToWorkers = new HashMap<>(); + private WorkerSketchFetcher workerSketchFetcher; // Time at which the query started. // For live reports. Written by the main controller thread, read by HTTP threads. @@ -616,8 +620,7 @@ public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNum if (queryKernel.getStagePhase(stageId).equals(ControllerStagePhase.MERGING_STATISTICS)) { // we only need tasks which are active for this stage. - List workerTaskIds = workerTaskLauncher.getTaskList() - .subList(0, queryKernel.getWorkerCountForStage(stageId)); + List workerTaskIds = workerTaskLauncher.getTaskList(); CompleteKeyStatisticsInformation completeKeyStatisticsInformation = queryKernel.getCompleteKeyStatisticsInformation(stageId); @@ -626,7 +629,8 @@ public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNum workerSketchFetcher.submitFetcherTask( completeKeyStatisticsInformation, workerTaskIds, - stageDef + stageDef, + stageToWorkers.get(stageNumber) ); // Add the listener to handle completion. @@ -1067,6 +1071,7 @@ private void startWorkForStage( ); final Int2ObjectMap workOrders = queryKernel.createWorkOrders(stageNumber, extraInfos); + stageToWorkers.put(stageNumber, ImmutableSet.copyOf(workOrders.keySet())); contactWorkersForStage( (netClient, taskId, workerNumber) -> netClient.postWorkOrder(taskId, workOrders.get(workerNumber)), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java index 534ed8a7d567..9f79c8f93562 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java @@ -40,7 +40,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.stream.IntStream; /** * Queues up fetching sketches from workers and progressively generates partitions boundaries. @@ -78,7 +77,8 @@ public WorkerSketchFetcher( public CompletableFuture> submitFetcherTask( CompleteKeyStatisticsInformation completeKeyStatisticsInformation, List workerTaskIds, - StageDefinition stageDefinition + StageDefinition stageDefinition, + Set workersForStage ) { ClusterBy clusterBy = stageDefinition.getClusterBy(); @@ -87,7 +87,7 @@ public CompletableFuture> submitFetcherTask( case SEQUENTIAL: return sequentialTimeChunkMerging(completeKeyStatisticsInformation, stageDefinition, workerTaskIds); case PARALLEL: - return inMemoryFullSketchMerging(stageDefinition, workerTaskIds); + return inMemoryFullSketchMerging(stageDefinition, workerTaskIds, workersForStage); case AUTO: if (clusterBy.getBucketByCount() == 0) { log.info( @@ -96,7 +96,7 @@ public CompletableFuture> submitFetcherTask( stageDefinition.getStageNumber() ); // If there is no time clustering, there is no scope for sequential merge - return inMemoryFullSketchMerging(stageDefinition, workerTaskIds); + return inMemoryFullSketchMerging(stageDefinition, workerTaskIds, workersForStage); } else if (stageDefinition.getMaxWorkerCount() > WORKER_THRESHOLD || completeKeyStatisticsInformation.getBytesRetained() > BYTES_THRESHOLD) { log.info( @@ -111,7 +111,7 @@ public CompletableFuture> submitFetcherTask( stageDefinition.getId().getQueryId(), stageDefinition.getStageNumber() ); - return inMemoryFullSketchMerging(stageDefinition, workerTaskIds); + return inMemoryFullSketchMerging(stageDefinition, workerTaskIds, workersForStage); default: throw new IllegalStateException("No fetching strategy found for mode: " + clusterStatisticsMergeMode); } @@ -124,7 +124,8 @@ public CompletableFuture> submitFetcherTask( */ CompletableFuture> inMemoryFullSketchMerging( StageDefinition stageDefinition, - List workerTaskIds + List workerTaskIds, + Set workersForStage ) { CompletableFuture> partitionFuture = new CompletableFuture<>(); @@ -144,7 +145,7 @@ CompletableFuture> inMemoryFullSketchMerging( ); // Submit a task for each worker to fetch statistics - IntStream.range(0, workerCount).forEach(workerNo -> { + workersForStage.forEach(workerNo -> { executorService.submit(() -> { ListenableFuture snapshotFuture = workerClient.fetchClusterByStatisticsSnapshot( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java index 97bfca8b721a..d2e9f198f04b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerQueryKernel.java @@ -444,14 +444,6 @@ public void setResultsCompleteForStageAndWorker( } } - /** - * Delegates call to {@link ControllerStageTracker#getWorkerCount()} - */ - public int getWorkerCountForStage(final StageId stageId) - { - return getStageKernelOrThrow(stageId).getWorkerCount(); - } - /** * Delegates call to {@link ControllerStageTracker#getFailureReason()} */ diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java index 9f13196d651a..bf53239335c5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/controller/ControllerStageTracker.java @@ -421,14 +421,6 @@ private void generateResultPartitionsAndBoundariesWithoutKeyStatistics() } } - /** - * Returns the worker count for this stage. - */ - public int getWorkerCount() - { - return workerCount; - } - /** * Marks the stage as failed and sets the reason for the same. * diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherAutoModeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherAutoModeTest.java index 42f6f0437f59..1cdf6b49f4d6 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherAutoModeTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherAutoModeTest.java @@ -56,7 +56,7 @@ public void setUp() target = spy(new WorkerSketchFetcher(mock(WorkerClient.class), ClusterStatisticsMergeMode.AUTO, 300_000_000)); // Don't actually try to fetch sketches - doReturn(null).when(target).inMemoryFullSketchMerging(any(), any()); + doReturn(null).when(target).inMemoryFullSketchMerging(any(), any(), any()); doReturn(null).when(target).sequentialTimeChunkMerging(any(), any(), any()); doReturn(StageId.fromString("1_1")).when(stageDefinition).getId(); @@ -81,8 +81,13 @@ public void test_submitFetcherTask_belowThresholds_ShouldBeParallel() // Worker count below threshold doReturn(1).when(stageDefinition).getMaxWorkerCount(); - target.submitFetcherTask(completeKeyStatisticsInformation, Collections.emptyList(), stageDefinition); - verify(target, times(1)).inMemoryFullSketchMerging(any(), any()); + target.submitFetcherTask( + completeKeyStatisticsInformation, + Collections.emptyList(), + stageDefinition, + Collections.emptySet() + ); + verify(target, times(1)).inMemoryFullSketchMerging(any(), any(), any()); verify(target, times(0)).sequentialTimeChunkMerging(any(), any(), any()); } @@ -98,8 +103,13 @@ public void test_submitFetcherTask_workerCountAboveThreshold_shouldBeSequential( // Worker count below threshold doReturn((int) WorkerSketchFetcher.WORKER_THRESHOLD + 1).when(stageDefinition).getMaxWorkerCount(); - target.submitFetcherTask(completeKeyStatisticsInformation, Collections.emptyList(), stageDefinition); - verify(target, times(0)).inMemoryFullSketchMerging(any(), any()); + target.submitFetcherTask( + completeKeyStatisticsInformation, + Collections.emptyList(), + stageDefinition, + Collections.emptySet() + ); + verify(target, times(0)).inMemoryFullSketchMerging(any(), any(), any()); verify(target, times(1)).sequentialTimeChunkMerging(any(), any(), any()); } @@ -115,8 +125,13 @@ public void test_submitFetcherTask_noClusterByColumns_shouldBeParallel() // Worker count above threshold doReturn((int) WorkerSketchFetcher.WORKER_THRESHOLD + 1).when(stageDefinition).getMaxWorkerCount(); - target.submitFetcherTask(completeKeyStatisticsInformation, Collections.emptyList(), stageDefinition); - verify(target, times(1)).inMemoryFullSketchMerging(any(), any()); + target.submitFetcherTask( + completeKeyStatisticsInformation, + Collections.emptyList(), + stageDefinition, + Collections.emptySet() + ); + verify(target, times(1)).inMemoryFullSketchMerging(any(), any(), any()); verify(target, times(0)).sequentialTimeChunkMerging(any(), any(), any()); } @@ -132,8 +147,13 @@ public void test_submitFetcherTask_bytesRetainedAboveThreshold_shouldBeSequentia // Worker count below threshold doReturn(1).when(stageDefinition).getMaxWorkerCount(); - target.submitFetcherTask(completeKeyStatisticsInformation, Collections.emptyList(), stageDefinition); - verify(target, times(0)).inMemoryFullSketchMerging(any(), any()); + target.submitFetcherTask( + completeKeyStatisticsInformation, + Collections.emptyList(), + stageDefinition, + Collections.emptySet() + ); + verify(target, times(0)).inMemoryFullSketchMerging(any(), any(), any()); verify(target, times(1)).sequentialTimeChunkMerging(any(), any(), any()); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java index 489d72106fc5..39767ba6fbf8 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java @@ -49,6 +49,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import static org.easymock.EasyMock.mock; import static org.mockito.ArgumentMatchers.any; @@ -117,7 +118,6 @@ public void test_submitFetcherTask_parallelFetch_mergePerformedCorrectly() final Queue snapshotQueue = new ConcurrentLinkedQueue<>(); final List workerIds = ImmutableList.of("0", "1", "2", "3", "4"); final CountDownLatch latch = new CountDownLatch(workerIds.size()); - doReturn(workerIds.size()).when(stageDefinition).getMaxWorkerCount(); target = spy(new WorkerSketchFetcher(workerClient, ClusterStatisticsMergeMode.PARALLEL, 300_000_000)); @@ -132,7 +132,8 @@ public void test_submitFetcherTask_parallelFetch_mergePerformedCorrectly() CompletableFuture> eitherCompletableFuture = target.submitFetcherTask( completeKeyStatisticsInformation, workerIds, - stageDefinition + stageDefinition, + workerIds.stream().map(Integer::parseInt).collect(Collectors.toSet()) ); // Assert that the final result is complete and all other sketches returned have been merged. @@ -172,7 +173,8 @@ public void test_submitFetcherTask_sequentialFetch_mergePerformedCorrectly() CompletableFuture> eitherCompletableFuture = target.submitFetcherTask( completeKeyStatisticsInformation, ImmutableList.of("0", "1", "2", "3", "4"), - stageDefinition + stageDefinition, + ImmutableSet.of() ); // Assert that the final result is complete and all other sketches returned have been merged. From 0bdb40fde95e35a4c59a66eb0865d0b035b84efd Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 13 Dec 2022 22:14:04 +0530 Subject: [PATCH 08/10] Nits --- .../apache/druid/msq/exec/ControllerImpl.java | 3 +-- .../org/apache/druid/msq/exec/WorkerImpl.java | 16 ++++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index cecd9faf562c..0421b203b2f0 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -619,7 +619,6 @@ public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNum queryKernel.addPartialKeyStatisticsForStageAndWorker(stageId, workerNumber, partialKeyStatisticsInformation); if (queryKernel.getStagePhase(stageId).equals(ControllerStagePhase.MERGING_STATISTICS)) { - // we only need tasks which are active for this stage. List workerTaskIds = workerTaskLauncher.getTaskList(); CompleteKeyStatisticsInformation completeKeyStatisticsInformation = queryKernel.getCompleteKeyStatisticsInformation(stageId); @@ -630,7 +629,7 @@ public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNum completeKeyStatisticsInformation, workerTaskIds, stageDef, - stageToWorkers.get(stageNumber) + stageToWorkers.get(stageNumber) // we only need tasks which are active for this stage. ); // Add the listener to handle completion. diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index c560d690e901..8c5a782f5368 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -573,14 +573,14 @@ public ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId) { if (stageKernelMap.get(stageId) == null) { throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId); - } - if (stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot() == null) { + } else if (stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot() == null) { throw new ISE( "Requested statistics snapshot is not generated yet for stageId[%s]", stageId ); + } else { + return stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot(); } - return stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot(); } @Override @@ -588,16 +588,16 @@ public ClusterByStatisticsSnapshot fetchStatisticsSnapshotForTimeChunk(StageId s { if (stageKernelMap.get(stageId) == null) { throw new ISE("Requested statistics snapshot for non-existent stageId[%s].", stageId); - } - if (stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot() == null) { + } else if (stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot() == null) { throw new ISE( "Requested statistics snapshot is not generated yet for stageId[%s]", stageId ); + } else { + return stageKernelMap.get(stageId) + .getResultKeyStatisticsSnapshot() + .getSnapshotForTimeChunk(timeChunk); } - return stageKernelMap.get(stageId) - .getResultKeyStatisticsSnapshot() - .getSnapshotForTimeChunk(timeChunk); } From 989eccc3ca01d4b6d20b3de6589055d2a9fd1309 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 14 Dec 2022 12:02:31 +0530 Subject: [PATCH 09/10] Moving to existing kernel methods --- .../apache/druid/msq/exec/ControllerImpl.java | 7 ++-- .../druid/msq/exec/WorkerSketchFetcher.java | 12 ++++--- .../exec/WorkerSketchFetcherAutoModeTest.java | 9 ++--- .../msq/exec/WorkerSketchFetcherTest.java | 36 ++++++++++++++----- 4 files changed, 42 insertions(+), 22 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index 0421b203b2f0..96df5ea5b05a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -263,9 +263,6 @@ public class ControllerImpl implements Controller // For live reports. Written by the main controller thread, read by HTTP threads. private final ConcurrentHashMap stagePartitionCountsForLiveReports = new ConcurrentHashMap<>(); - // Stage number -> set of immutable workers assigned for that stage - // Always accessed by the main controller thread. - private final Map> stageToWorkers = new HashMap<>(); private WorkerSketchFetcher workerSketchFetcher; // Time at which the query started. @@ -629,7 +626,8 @@ public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNum completeKeyStatisticsInformation, workerTaskIds, stageDef, - stageToWorkers.get(stageNumber) // we only need tasks which are active for this stage. + queryKernel.getWorkerInputsForStage(stageId).workers() + // we only need tasks which are active for this stage. ); // Add the listener to handle completion. @@ -1070,7 +1068,6 @@ private void startWorkForStage( ); final Int2ObjectMap workOrders = queryKernel.createWorkOrders(stageNumber, extraInfos); - stageToWorkers.put(stageNumber, ImmutableSet.copyOf(workOrders.keySet())); contactWorkersForStage( (netClient, taskId, workerNumber) -> netClient.postWorkOrder(taskId, workOrders.get(workerNumber)), diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java index 9f79c8f93562..2eba0c409d28 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerSketchFetcher.java @@ -20,6 +20,7 @@ package org.apache.druid.msq.exec; import com.google.common.util.concurrent.ListenableFuture; +import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.ClusterByPartition; import org.apache.druid.frame.key.ClusterByPartitions; @@ -40,6 +41,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; /** * Queues up fetching sketches from workers and progressively generates partitions boundaries. @@ -78,7 +80,7 @@ public CompletableFuture> submitFetcherTask( CompleteKeyStatisticsInformation completeKeyStatisticsInformation, List workerTaskIds, StageDefinition stageDefinition, - Set workersForStage + IntSet workersForStage ) { ClusterBy clusterBy = stageDefinition.getClusterBy(); @@ -125,7 +127,7 @@ public CompletableFuture> submitFetcherTask( CompletableFuture> inMemoryFullSketchMerging( StageDefinition stageDefinition, List workerTaskIds, - Set workersForStage + IntSet workersForStage ) { CompletableFuture> partitionFuture = new CompletableFuture<>(); @@ -133,15 +135,15 @@ CompletableFuture> inMemoryFullSketchMerging( // Create a new key statistics collector to merge worker sketches into final ClusterByStatisticsCollector mergedStatisticsCollector = stageDefinition.createResultKeyStatisticsCollector(statisticsMaxRetainedBytes); - final int workerCount = workerTaskIds.size(); + final int workerCount = workersForStage.size(); // Guarded by synchronized mergedStatisticsCollector final Set finishedWorkers = new HashSet<>(); log.info( - "Fetching stats using %s for stage[%d] for tasks[%s]", + "Fetching stats using %s for stage[%d] for workers[%s] ", ClusterStatisticsMergeMode.PARALLEL, stageDefinition.getStageNumber(), - String.join("", workerTaskIds) + workersForStage.stream().map(Object::toString).collect(Collectors.joining(",")) ); // Submit a task for each worker to fetch statistics diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherAutoModeTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherAutoModeTest.java index 1cdf6b49f4d6..02be2876f98b 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherAutoModeTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherAutoModeTest.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.exec; +import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.kernel.StageId; @@ -85,7 +86,7 @@ public void test_submitFetcherTask_belowThresholds_ShouldBeParallel() completeKeyStatisticsInformation, Collections.emptyList(), stageDefinition, - Collections.emptySet() + IntSet.of() ); verify(target, times(1)).inMemoryFullSketchMerging(any(), any(), any()); verify(target, times(0)).sequentialTimeChunkMerging(any(), any(), any()); @@ -107,7 +108,7 @@ public void test_submitFetcherTask_workerCountAboveThreshold_shouldBeSequential( completeKeyStatisticsInformation, Collections.emptyList(), stageDefinition, - Collections.emptySet() + IntSet.of() ); verify(target, times(0)).inMemoryFullSketchMerging(any(), any(), any()); verify(target, times(1)).sequentialTimeChunkMerging(any(), any(), any()); @@ -129,7 +130,7 @@ public void test_submitFetcherTask_noClusterByColumns_shouldBeParallel() completeKeyStatisticsInformation, Collections.emptyList(), stageDefinition, - Collections.emptySet() + IntSet.of() ); verify(target, times(1)).inMemoryFullSketchMerging(any(), any(), any()); verify(target, times(0)).sequentialTimeChunkMerging(any(), any(), any()); @@ -151,7 +152,7 @@ public void test_submitFetcherTask_bytesRetainedAboveThreshold_shouldBeSequentia completeKeyStatisticsInformation, Collections.emptyList(), stageDefinition, - Collections.emptySet() + IntSet.of() ); verify(target, times(0)).inMemoryFullSketchMerging(any(), any(), any()); verify(target, times(1)).sequentialTimeChunkMerging(any(), any(), any()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java index 39767ba6fbf8..fc2449003614 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerSketchFetcherTest.java @@ -23,6 +23,8 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedMap; import com.google.common.util.concurrent.Futures; +import it.unimi.dsi.fastutil.ints.IntAVLTreeSet; +import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.ClusterByPartition; import org.apache.druid.frame.key.ClusterByPartitions; @@ -49,7 +51,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; import static org.easymock.EasyMock.mock; import static org.mockito.ArgumentMatchers.any; @@ -89,11 +90,19 @@ public void setUp() doReturn(clusterBy).when(stageDefinition).getClusterBy(); doReturn(25_000).when(stageDefinition).getMaxPartitionCount(); - expectedPartitions1 = new ClusterByPartitions(ImmutableList.of(new ClusterByPartition(mock(RowKey.class), mock(RowKey.class)))); - expectedPartitions2 = new ClusterByPartitions(ImmutableList.of(new ClusterByPartition(mock(RowKey.class), mock(RowKey.class)))); + expectedPartitions1 = new ClusterByPartitions(ImmutableList.of(new ClusterByPartition( + mock(RowKey.class), + mock(RowKey.class) + ))); + expectedPartitions2 = new ClusterByPartitions(ImmutableList.of(new ClusterByPartition( + mock(RowKey.class), + mock(RowKey.class) + ))); - doReturn(Either.value(expectedPartitions1)).when(stageDefinition).generatePartitionsForShuffle(eq(mergedClusterByStatisticsCollector1)); - doReturn(Either.value(expectedPartitions2)).when(stageDefinition).generatePartitionsForShuffle(eq(mergedClusterByStatisticsCollector2)); + doReturn(Either.value(expectedPartitions1)).when(stageDefinition) + .generatePartitionsForShuffle(eq(mergedClusterByStatisticsCollector1)); + doReturn(Either.value(expectedPartitions2)).when(stageDefinition) + .generatePartitionsForShuffle(eq(mergedClusterByStatisticsCollector2)); doReturn( mergedClusterByStatisticsCollector1, @@ -129,11 +138,14 @@ public void test_submitFetcherTask_parallelFetch_mergePerformedCorrectly() return Futures.immediateFuture(snapshot); }).when(workerClient).fetchClusterByStatisticsSnapshot(any(), any(), anyInt()); + IntSet workersForStage = new IntAVLTreeSet(); + workersForStage.addAll(ImmutableSet.of(0, 1, 2, 3, 4)); + CompletableFuture> eitherCompletableFuture = target.submitFetcherTask( completeKeyStatisticsInformation, workerIds, stageDefinition, - workerIds.stream().map(Integer::parseInt).collect(Collectors.toSet()) + workersForStage ); // Assert that the final result is complete and all other sketches returned have been merged. @@ -156,7 +168,12 @@ public void test_submitFetcherTask_sequentialFetch_mergePerformedCorrectly() // Store snapshots in a queue final Queue snapshotQueue = new ConcurrentLinkedQueue<>(); - SortedMap> timeSegmentVsWorkerMap = ImmutableSortedMap.of(1L, ImmutableSet.of(0, 1, 2), 2L, ImmutableSet.of(0, 1, 4)); + SortedMap> timeSegmentVsWorkerMap = ImmutableSortedMap.of( + 1L, + ImmutableSet.of(0, 1, 2), + 2L, + ImmutableSet.of(0, 1, 4) + ); doReturn(timeSegmentVsWorkerMap).when(completeKeyStatisticsInformation).getTimeSegmentVsWorkerMap(); final CyclicBarrier barrier = new CyclicBarrier(3); @@ -170,11 +187,14 @@ public void test_submitFetcherTask_sequentialFetch_mergePerformedCorrectly() return Futures.immediateFuture(snapshot); }).when(workerClient).fetchClusterByStatisticsSnapshotForTimeChunk(any(), any(), anyInt(), anyLong()); + IntSet workersForStage = new IntAVLTreeSet(); + workersForStage.addAll(ImmutableSet.of(0, 1, 2, 3, 4)); + CompletableFuture> eitherCompletableFuture = target.submitFetcherTask( completeKeyStatisticsInformation, ImmutableList.of("0", "1", "2", "3", "4"), stageDefinition, - ImmutableSet.of() + workersForStage ); // Assert that the final result is complete and all other sketches returned have been merged. From 6b28a58d555ffff2d5d5122bb5b4b2a0f30cd131 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Wed, 14 Dec 2022 18:53:22 +0530 Subject: [PATCH 10/10] Adding more coverage --- .../apache/druid/msq/exec/MSQInsertTest.java | 26 +++++++++ .../apache/druid/msq/exec/WorkerImplTest.java | 54 +++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java index f54d2fa880cb..cf4e4052d35e 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQInsertTest.java @@ -128,6 +128,32 @@ public void testInsertOnFoo1WithTimeFunction() } + @Test + public void testInsertOnFoo1WithTimeFunctionWithSequential() + { + RowSignature rowSignature = RowSignature.builder() + .add("__time", ColumnType.LONG) + .add("dim1", ColumnType.STRING) + .add("cnt", ColumnType.LONG).build(); + Map context = ImmutableMap.builder() + .putAll(DEFAULT_MSQ_CONTEXT) + .put( + MultiStageQueryContext.CTX_CLUSTER_STATISTICS_MERGE_MODE, + ClusterStatisticsMergeMode.SEQUENTIAL.toString() + ) + .build(); + + testIngestQuery().setSql( + "insert into foo1 select floor(__time to day) as __time , dim1 , count(*) as cnt from foo where dim1 is not null group by 1, 2 PARTITIONED by day clustered by dim1") + .setQueryContext(context) + .setExpectedDataSource("foo1") + .setExpectedRowSignature(rowSignature) + .setExpectedSegment(expectedFooSegments()) + .setExpectedResultRows(expectedFooRows()) + .verifyResults(); + + } + @Test public void testInsertOnFoo1WithMultiValueDim() { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java new file mode 100644 index 000000000000..52231a116b6a --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/WorkerImplTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.msq.indexing.MSQWorkerTask; +import org.apache.druid.msq.kernel.StageId; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.HashMap; + + +@RunWith(MockitoJUnitRunner.class) +public class WorkerImplTest +{ + @Mock + WorkerContext workerContext; + + @Test + public void testFetchStatsThrows() + { + WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 1, new HashMap<>()), workerContext); + Assert.assertThrows(ISE.class, () -> worker.fetchStatisticsSnapshot(new StageId("xx", 1))); + } + + @Test + public void testFetchStatsWithTimeChunkThrows() + { + WorkerImpl worker = new WorkerImpl(new MSQWorkerTask("controller", "ds", 1, new HashMap<>()), workerContext); + Assert.assertThrows(ISE.class, () -> worker.fetchStatisticsSnapshotForTimeChunk(new StageId("xx", 1), 1L)); + } + +}