diff --git a/docs/multi-stage-query/reference.md b/docs/multi-stage-query/reference.md
index fe86ccad5058..3fa179e1de2b 100644
--- a/docs/multi-stage-query/reference.md
+++ b/docs/multi-stage-query/reference.md
@@ -243,6 +243,7 @@ The following table lists the context parameters for the MSQ task engine:
| `indexSpec` | INSERT or REPLACE
An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
| `durableShuffleStorage` | SELECT, INSERT, REPLACE
Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error.
Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
+| `selectDestination` | SELECT
Controls where the final result of the select query is written. Use `taskReport`(the default) to write select results to the task report. This is not scalable since task reports size explodes for large results Use `durableStorage` to write results to durable storage location. For large results sets, its recommended to use `durableStorage` . To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |
## Joins
@@ -376,6 +377,12 @@ When you run a query, include the context parameter `durableShuffleStorage` and
For queries where you want to use fault tolerance for workers, set `faultTolerance` to `true`, which automatically sets `durableShuffleStorage` to `true`.
+Set `selectDestination`:`durableStorage` for select queries that want to write the final results to durable storage instead of the task reports. Saving the results in the durable
+storage allows users to fetch large result sets. The location where the workers write the intermediate results is different than the location where final results get stored. Therefore, `durableShuffleStorage`:`false` and
+`selectDestination`:`durableStorage` is a valid configuration to use in the query context, that instructs the controller to persist only the final result in the durable storage, and not the
+intermediate results.
+
+
## Durable storage configurations
The following common service properties control how durable storage behaves:
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index f5f7b0cc1aa3..6cf7a2f5181b 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -605,14 +605,14 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
if (MSQControllerTask.writeResultsToDurableStorage(task.getQuerySpec())) {
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
- MSQSelectDestination.DURABLE_STORAGE.name()
+ MSQSelectDestination.DURABLESTORAGE.getName()
);
} else {
// we need not pass the value 'TaskReport' to the worker since the worker impl does not do anything in such a case.
// but we are passing it anyway for completeness
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
- MSQSelectDestination.TASK_REPORT.name()
+ MSQSelectDestination.TASKREPORT.getName()
);
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index 942d8b44b0c9..2c347c4275e9 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -732,7 +732,7 @@ private OutputChannelFactory makeStageOutputChannelFactory(
final int frameSize = frameContext.memoryParameters().getStandardFrameSize();
if (durableStageStorageEnabled || (isFinalStage
- && MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))) {
+ && MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))) {
return DurableStorageOutputChannelFactory.createStandardImplementation(
task.getControllerTaskId(),
task().getWorkerNumber(),
@@ -741,7 +741,7 @@ private OutputChannelFactory makeStageOutputChannelFactory(
frameSize,
MSQTasks.makeStorageConnector(context.injector()),
context.tempDir(),
- (isFinalStage && MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))
+ (isFinalStage && MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))
);
} else {
final File fileChannelDirectory =
@@ -1320,7 +1320,7 @@ private void writeDurableStorageSuccessFileIfNeeded(final int stageNumber, boole
{
final DurableStorageOutputChannelFactory durableStorageOutputChannelFactory;
if (durableStageStorageEnabled || (isFinalStage
- && MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))) {
+ && MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))) {
durableStorageOutputChannelFactory = DurableStorageOutputChannelFactory.createStandardImplementation(
task.getControllerTaskId(),
task().getWorkerNumber(),
@@ -1329,7 +1329,7 @@ private void writeDurableStorageSuccessFileIfNeeded(final int stageNumber, boole
frameContext.memoryParameters().getStandardFrameSize(),
MSQTasks.makeStorageConnector(context.injector()),
context.tempDir(),
- (isFinalStage && MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))
+ (isFinalStage && MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))
);
} else {
return;
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/MSQSelectDestination.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/MSQSelectDestination.java
index d41a85622693..db57fdc5dc07 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/MSQSelectDestination.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/destination/MSQSelectDestination.java
@@ -19,6 +19,8 @@
package org.apache.druid.msq.indexing.destination;
+import com.fasterxml.jackson.annotation.JsonValue;
+
/**
* Determines the destination for results of select queries.
*/
@@ -27,12 +29,13 @@ public enum MSQSelectDestination
/**
* Writes all the results directly to the report.
*/
- TASK_REPORT(false),
+ TASKREPORT("taskReport", false),
/**
* Writes the results as frame files to durable storage. Task report can be truncated to a preview.
*/
- DURABLE_STORAGE(true);
+ DURABLESTORAGE("durableStorage", true);
+ private final String name;
private final boolean shouldTruncateResultsInTaskReport;
public boolean shouldTruncateResultsInTaskReport()
@@ -40,8 +43,24 @@ public boolean shouldTruncateResultsInTaskReport()
return shouldTruncateResultsInTaskReport;
}
- MSQSelectDestination(boolean shouldTruncateResultsInTaskReport)
+ MSQSelectDestination(String name, boolean shouldTruncateResultsInTaskReport)
{
+ this.name = name;
this.shouldTruncateResultsInTaskReport = shouldTruncateResultsInTaskReport;
}
+
+ @JsonValue
+ public String getName()
+ {
+ return name;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MSQSelectDestination{" +
+ "name='" + name + '\'' +
+ ", shouldTruncateResultsInTaskReport=" + shouldTruncateResultsInTaskReport +
+ '}';
+ }
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/input/DurableStorageQueryResultsInputChannelFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/input/DurableStorageQueryResultsInputChannelFactory.java
index b29220e790d7..620b2271e252 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/input/DurableStorageQueryResultsInputChannelFactory.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/shuffle/input/DurableStorageQueryResultsInputChannelFactory.java
@@ -25,7 +25,7 @@
import java.util.concurrent.ExecutorService;
/**
- * Used for reading results when select destination is {@link org.apache.druid.msq.indexing.destination.MSQSelectDestination#DURABLE_STORAGE}
+ * Used for reading results when select destination is {@link org.apache.druid.msq.indexing.destination.MSQSelectDestination#DURABLESTORAGE}
*/
public class DurableStorageQueryResultsInputChannelFactory extends DurableStorageInputChannelFactory
{
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
index 772af8524e24..de48387db200 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/sql/MSQTaskQueryMaker.java
@@ -66,6 +66,7 @@
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -233,17 +234,18 @@ public QueryResponse