Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,28 @@ The following table lists the context parameters for the MSQ task engine:
| `maxParseExceptions`| SELECT, INSERT, REPLACE<br /><br />Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1.| 0 |
| `rowsPerSegment` | INSERT or REPLACE<br /><br />The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 |
| `indexSpec` | INSERT or REPLACE<br /><br />An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
| `clusterStatisticsMergeMode` | Whether to use parallel or sequential mode for merging of the worker sketches. Can be `PARALLEL`, `SEQUENTIAL` or `AUTO`. See [Sketch Merging Mode](#sketch-merging-mode) for more information. | `AUTO` |

## Sketch Merging Mode
This section details the advantages and performance of various Cluster By Statistics Merge Modes.

If a query requires key statistics to generate partition boundaries, key statistics are gathered by the workers while
reading rows from the datasource. These statistics must be transferred to the controller to be merged together.
`clusterStatisticsMergeMode` configures the way in which this happens.

`PARALLEL` mode fetches the key statistics for all time chunks from all workers together and the controller then downsamples
the sketch if it does not fit in memory. This is faster than `SEQUENTIAL` mode as there is less over head in fetching sketches
for all time chunks together. This is good for small sketches which won't be downsampled even if merged together or if
accuracy in segment sizing for the ingestion is not very important.

`SEQUENTIAL` mode fetches the sketches in ascending order of time and generates the partition boundaries for one time
chunk at a time. This gives more working memory to the controller for merging sketches, which results in less
downsampling and thus, more accuracy. There is, however, a time overhead on fetching sketches in sequential order. This is
good for cases where accuracy is important.

`AUTO` mode tries to find the best approach based on number of workers and size of input rows. If there are more
than 100 workers or if the combined sketch size among all workers is more than 1GB, `SEQUENTIAL` is chosen, otherwise,
`PARALLEL` is chosen.

## Durable Storage
This section enumerates the advantages and performance implications of enabling durable storage while executing MSQ tasks.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.exec;

/**
* Mode which dictates how {@link WorkerSketchFetcher} gets sketches for the partition boundaries from workers.
*/
public enum ClusterStatisticsMergeMode
{
/**
* Fetches sketch in sequential order based on time. Slower due to overhead, but more accurate.
*/
SEQUENTIAL,

/**
* Fetch all sketches from the worker at once. Faster to generate partitions, but less accurate.
*/
PARALLEL,

/**
* Tries to decide between sequential and parallel modes based on the number of workers and size of the input
*
* If there are more than 100 workers or if the combined sketch size among all workers is more than
* 1,000,000,000 bytes, SEQUENTIAL mode is chosen, otherwise, PARALLEL mode is chosen.
*/
AUTO
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;

import javax.annotation.Nullable;
import java.util.List;
Expand Down Expand Up @@ -81,9 +81,11 @@ public String getId()
// Worker-to-controller messages

/**
* Provide a {@link ClusterByStatisticsSnapshot} for shuffling stages.
* Accepts a {@link PartialKeyStatisticsInformation} and updates the controller key statistics information. If all key
* statistics have been gathered, enqueues the task with the {@link WorkerSketchFetcher} to generate partiton boundaries.
* This is intended to be called by the {@link org.apache.druid.msq.indexing.ControllerChatHandler}.
*/
void updateStatus(int stageNumber, int workerNumber, Object keyStatisticsObject);
void updatePartialKeyStatisticsInformation(int stageNumber, int workerNumber, Object partialKeyStatisticsInformationObject);

/**
* System error reported by a subtask. Note that the errors are organized by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.indexing.error.MSQErrorReport;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;

import javax.annotation.Nullable;
import java.io.IOException;
Expand All @@ -34,13 +34,13 @@
public interface ControllerClient extends AutoCloseable
{
/**
* Client side method to update the controller with key statistics for a particular stage and worker.
* Controller's implementation collates all the key statistics for a stage to generate the partition boundaries.
* Client side method to update the controller with partial key statistics information for a particular stage and worker.
* Controller's implementation collates all the information for a stage to fetch key statistics from workers.
*/
void postKeyStatistics(
void postPartialKeyStatistics(
StageId stageId,
int workerNumber,
ClusterByStatisticsSnapshot keyStatistics
PartialKeyStatisticsInformation partialKeyStatisticsInformation
) throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
Expand Down Expand Up @@ -107,6 +108,7 @@
import org.apache.druid.msq.indexing.error.MSQWarningReportLimiterPublisher;
import org.apache.druid.msq.indexing.error.MSQWarnings;
import org.apache.druid.msq.indexing.error.QueryNotSupportedFault;
import org.apache.druid.msq.indexing.error.TooManyPartitionsFault;
import org.apache.druid.msq.indexing.error.TooManyWarningsFault;
import org.apache.druid.msq.indexing.error.UnknownFault;
import org.apache.druid.msq.indexing.report.MSQResultsReport;
Expand Down Expand Up @@ -149,7 +151,8 @@
import org.apache.druid.msq.shuffle.DurableStorageInputChannelFactory;
import org.apache.druid.msq.shuffle.DurableStorageUtils;
import org.apache.druid.msq.shuffle.WorkerInputChannelFactory;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.CompleteKeyStatisticsInformation;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.msq.util.IntervalUtils;
import org.apache.druid.msq.util.MSQFutureUtils;
Expand Down Expand Up @@ -201,6 +204,7 @@
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -259,6 +263,7 @@ public class ControllerImpl implements Controller
// For live reports. Written by the main controller thread, read by HTTP threads.
private final ConcurrentHashMap<Integer, Integer> stagePartitionCountsForLiveReports = new ConcurrentHashMap<>();

private WorkerSketchFetcher workerSketchFetcher;
// Time at which the query started.
// For live reports. Written by the main controller thread, read by HTTP threads.
private volatile DateTime queryStartTime = null;
Expand Down Expand Up @@ -519,6 +524,15 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
context.registerController(this, closer);

this.netClient = new ExceptionWrappingWorkerClient(context.taskClientFor(this));
ClusterStatisticsMergeMode clusterStatisticsMergeMode =
MultiStageQueryContext.getClusterStatisticsMergeMode(task.getQuerySpec().getQuery().context());

log.debug("Query [%s] cluster statistics merge mode is set to %s.", id(), clusterStatisticsMergeMode);

int statisticsMaxRetainedBytes = WorkerMemoryParameters.createProductionInstanceForController(context.injector())
.getPartitionStatisticsMaxRetainedBytes();
this.workerSketchFetcher = new WorkerSketchFetcher(netClient, clusterStatisticsMergeMode, statisticsMaxRetainedBytes);

closer.register(netClient::close);

final boolean isDurableStorageEnabled =
Expand Down Expand Up @@ -565,10 +579,12 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
}

/**
* Provide a {@link ClusterByStatisticsSnapshot} for shuffling stages.
* Accepts a {@link PartialKeyStatisticsInformation} and updates the controller key statistics information. If all key
* statistics information has been gathered, enqueues the task with the {@link WorkerSketchFetcher} to generate
* partiton boundaries. This is intended to be called by the {@link org.apache.druid.msq.indexing.ControllerChatHandler}.
*/
@Override
public void updateStatus(int stageNumber, int workerNumber, Object keyStatisticsObject)
public void updatePartialKeyStatisticsInformation(int stageNumber, int workerNumber, Object partialKeyStatisticsInformationObject)
{
addToKernelManipulationQueue(
queryKernel -> {
Expand All @@ -582,9 +598,9 @@ public void updateStatus(int stageNumber, int workerNumber, Object keyStatistics
stageDef.getShuffleSpec().get().doesAggregateByClusterKey()
);

final ClusterByStatisticsSnapshot keyStatistics;
final PartialKeyStatisticsInformation partialKeyStatisticsInformation;
try {
keyStatistics = mapper.convertValue(keyStatisticsObject, ClusterByStatisticsSnapshot.class);
partialKeyStatisticsInformation = mapper.convertValue(partialKeyStatisticsInformationObject, PartialKeyStatisticsInformation.class);
}
catch (IllegalArgumentException e) {
throw new IAE(
Expand All @@ -595,7 +611,36 @@ public void updateStatus(int stageNumber, int workerNumber, Object keyStatistics
);
}

queryKernel.addResultKeyStatisticsForStageAndWorker(stageId, workerNumber, keyStatistics);
queryKernel.addPartialKeyStatisticsForStageAndWorker(stageId, workerNumber, partialKeyStatisticsInformation);

if (queryKernel.getStagePhase(stageId).equals(ControllerStagePhase.MERGING_STATISTICS)) {
List<String> workerTaskIds = workerTaskLauncher.getTaskList();
CompleteKeyStatisticsInformation completeKeyStatisticsInformation =
queryKernel.getCompleteKeyStatisticsInformation(stageId);

// Queue the sketch fetching task into the worker sketch fetcher.
CompletableFuture<Either<Long, ClusterByPartitions>> clusterByPartitionsCompletableFuture =
workerSketchFetcher.submitFetcherTask(
completeKeyStatisticsInformation,
workerTaskIds,
stageDef
);

// Add the listener to handle completion.
clusterByPartitionsCompletableFuture.whenComplete((clusterByPartitionsEither, throwable) -> {
addToKernelManipulationQueue(holder -> {
if (throwable != null) {
holder.failStageForReason(stageId, UnknownFault.forException(throwable));
} else if (clusterByPartitionsEither.isError()) {
holder.failStageForReason(stageId, new TooManyPartitionsFault(stageDef.getMaxPartitionCount()));
} else {
log.debug("Query [%s] Partition boundaries generated for stage %s", id(), stageId);
holder.setClusterByPartitionBoundaries(stageId, clusterByPartitionsEither.valueOrThrow());
}
holder.transitionStageKernel(stageId, queryKernel.getStagePhase(stageId));
});
});
}
}
);
}
Expand Down Expand Up @@ -1959,11 +2004,7 @@ public RunQueryUntilDone(
this.queryDef = queryDef;
this.inputSpecSlicerFactory = inputSpecSlicerFactory;
this.closer = closer;
this.queryKernel = new ControllerQueryKernel(
queryDef,
WorkerMemoryParameters.createProductionInstanceForController(context.injector())
.getPartitionStatisticsMaxRetainedBytes()
);
this.queryKernel = new ControllerQueryKernel(queryDef);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.msq.indexing.error.WorkerRpcFailedFault;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;

import javax.annotation.Nullable;
import java.io.IOException;
Expand All @@ -55,6 +56,23 @@ public ListenableFuture<Void> postWorkOrder(String workerTaskId, WorkOrder workO
return wrap(workerTaskId, client, c -> c.postWorkOrder(workerTaskId, workOrder));
}

@Override
public ListenableFuture<ClusterByStatisticsSnapshot> fetchClusterByStatisticsSnapshot(String workerTaskId, String queryId, int stageNumber)
{
return client.fetchClusterByStatisticsSnapshot(workerTaskId, queryId, stageNumber);
}

@Override
public ListenableFuture<ClusterByStatisticsSnapshot> fetchClusterByStatisticsSnapshotForTimeChunk(
String workerTaskId,
String queryId,
int stageNumber,
long timeChunk
)
{
return client.fetchClusterByStatisticsSnapshotForTimeChunk(workerTaskId, queryId, stageNumber, timeChunk);
}

@Override
public ListenableFuture<Void> postResultPartitionBoundaries(
final String workerTaskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.msq.indexing.MSQWorkerTask;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;

import javax.annotation.Nullable;
import java.io.IOException;
Expand Down Expand Up @@ -67,6 +68,18 @@ public interface Worker
*/
void postWorkOrder(WorkOrder workOrder);

/**
* Returns the statistics snapshot for the given stageId. This is called from {@link WorkerSketchFetcher} under
* PARALLEL OR AUTO modes.
*/
ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId);

/**
* Returns the statistics snapshot for the given stageId which contains only the sketch for the specified timeChunk.
* This is called from {@link WorkerSketchFetcher} under SEQUENTIAL OR AUTO modes.
*/
ClusterByStatisticsSnapshot fetchStatisticsSnapshotForTimeChunk(StageId stageId, long timeChunk);

/**
* Called when the worker chat handler recieves the result partition boundaries for a particular stageNumber
* and queryId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.msq.counters.CounterSnapshotsTree;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;

import java.io.IOException;

Expand All @@ -38,6 +39,27 @@ public interface WorkerClient extends AutoCloseable
*/
ListenableFuture<Void> postWorkOrder(String workerTaskId, WorkOrder workOrder);

/**
* Fetches the {@link ClusterByStatisticsSnapshot} from a worker. This is intended to be used by the
* {@link WorkerSketchFetcher} under PARALLEL or AUTO modes.
*/
ListenableFuture<ClusterByStatisticsSnapshot> fetchClusterByStatisticsSnapshot(
String workerTaskId,
String queryId,
int stageNumber
);

/**
* Fetches a {@link ClusterByStatisticsSnapshot} which contains only the sketch of the specified timeChunk.
* This is intended to be used by the {@link WorkerSketchFetcher} under SEQUENTIAL or AUTO modes.
*/
ListenableFuture<ClusterByStatisticsSnapshot> fetchClusterByStatisticsSnapshotForTimeChunk(
String workerTaskId,
String queryId,
int stageNumber,
long timeChunk
);

/**
* Worker's client method to inform it of the partition boundaries for the given stage. This is usually invoked by the
* controller after collating the result statistics from all the workers processing the query
Expand Down
Loading