Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ The following table lists the context parameters for the MSQ task engine:
| `indexSpec` | INSERT or REPLACE<br /><br />An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error. <br /><br /> | `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `selectDestination` | SELECT<br /><br /> Controls where the final result of the select query is written. <br />Use `taskReport`(the default) to write select results to the task report. <b> This is not scalable since task reports size explodes for large results </b> <br/>Use `durableStorage` to write results to durable storage location. <b>For large results sets, its recommended to use `durableStorage` </b>. To configure durable storage see [`this`](#durable-storage) section. | `taskReport` |

## Joins

Expand Down Expand Up @@ -376,6 +377,12 @@ When you run a query, include the context parameter `durableShuffleStorage` and

For queries where you want to use fault tolerance for workers, set `faultTolerance` to `true`, which automatically sets `durableShuffleStorage` to `true`.

Set `selectDestination`:`durableStorage` for select queries that want to write the final results to durable storage instead of the task reports. Saving the results in the durable
storage allows users to fetch large result sets. The location where the workers write the intermediate results is different than the location where final results get stored. Therefore, `durableShuffleStorage`:`false` and
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
storage allows users to fetch large result sets. The location where the workers write the intermediate results is different than the location where final results get stored. Therefore, `durableShuffleStorage`:`false` and
storage allows users to fetch large result sets. The location where the workers write the intermediate results can be different from the location where the final results get stored. Therefore, `durableShuffleStorage`:`false` and

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The location where the workers write the intermediate results is always different hence I would like to keep the wording as is unless you feel strongly about it.

`selectDestination`:`durableStorage` is a valid configuration to use in the query context, that instructs the controller to persist only the final result in the durable storage, and not the
intermediate results.


## Durable storage configurations

The following common service properties control how durable storage behaves:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,14 +605,14 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
if (MSQControllerTask.writeResultsToDurableStorage(task.getQuerySpec())) {
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.DURABLE_STORAGE.name()
MSQSelectDestination.DURABLESTORAGE.getName()
);
} else {
// we need not pass the value 'TaskReport' to the worker since the worker impl does not do anything in such a case.
// but we are passing it anyway for completeness
taskContextOverridesBuilder.put(
MultiStageQueryContext.CTX_SELECT_DESTINATION,
MSQSelectDestination.TASK_REPORT.name()
MSQSelectDestination.TASKREPORT.getName()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ private OutputChannelFactory makeStageOutputChannelFactory(
final int frameSize = frameContext.memoryParameters().getStandardFrameSize();

if (durableStageStorageEnabled || (isFinalStage
&& MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))) {
&& MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))) {
return DurableStorageOutputChannelFactory.createStandardImplementation(
task.getControllerTaskId(),
task().getWorkerNumber(),
Expand All @@ -741,7 +741,7 @@ private OutputChannelFactory makeStageOutputChannelFactory(
frameSize,
MSQTasks.makeStorageConnector(context.injector()),
context.tempDir(),
(isFinalStage && MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))
(isFinalStage && MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))
);
} else {
final File fileChannelDirectory =
Expand Down Expand Up @@ -1320,7 +1320,7 @@ private void writeDurableStorageSuccessFileIfNeeded(final int stageNumber, boole
{
final DurableStorageOutputChannelFactory durableStorageOutputChannelFactory;
if (durableStageStorageEnabled || (isFinalStage
&& MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))) {
&& MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))) {
durableStorageOutputChannelFactory = DurableStorageOutputChannelFactory.createStandardImplementation(
task.getControllerTaskId(),
task().getWorkerNumber(),
Expand All @@ -1329,7 +1329,7 @@ private void writeDurableStorageSuccessFileIfNeeded(final int stageNumber, boole
frameContext.memoryParameters().getStandardFrameSize(),
MSQTasks.makeStorageConnector(context.injector()),
context.tempDir(),
(isFinalStage && MSQSelectDestination.DURABLE_STORAGE.equals(selectDestination))
(isFinalStage && MSQSelectDestination.DURABLESTORAGE.equals(selectDestination))
);
} else {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.msq.indexing.destination;

import com.fasterxml.jackson.annotation.JsonValue;

/**
* Determines the destination for results of select queries.
*/
Expand All @@ -27,21 +29,38 @@ public enum MSQSelectDestination
/**
* Writes all the results directly to the report.
*/
TASK_REPORT(false),
TASKREPORT("taskReport", false),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a snake case for the static variable names. Please revert it to the original values.
Since this is changed for equality, we can create a static method that checks if the String provided equals the enum destination.

Suggested change
TASKREPORT("taskReport", false),
TASK_REPORT("taskReport", false),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was done so that we are able to still use the query context enum methods. Also I think ResultFormat class also uses a similar trick. Checkout objectlines

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool!

/**
* Writes the results as frame files to durable storage. Task report can be truncated to a preview.
*/
DURABLE_STORAGE(true);
DURABLESTORAGE("durableStorage", true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DURABLESTORAGE("durableStorage", true);
DURABLE_STORAGE("durableStorage", true);


private final String name;
private final boolean shouldTruncateResultsInTaskReport;

public boolean shouldTruncateResultsInTaskReport()
{
return shouldTruncateResultsInTaskReport;
}

MSQSelectDestination(boolean shouldTruncateResultsInTaskReport)
MSQSelectDestination(String name, boolean shouldTruncateResultsInTaskReport)
{
this.name = name;
this.shouldTruncateResultsInTaskReport = shouldTruncateResultsInTaskReport;
}

@JsonValue
public String getName()
{
return name;
}

@Override
public String toString()
{
return "MSQSelectDestination{" +
"name='" + name + '\'' +
", shouldTruncateResultsInTaskReport=" + shouldTruncateResultsInTaskReport +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.ExecutorService;

/**
* Used for reading results when select destination is {@link org.apache.druid.msq.indexing.destination.MSQSelectDestination#DURABLE_STORAGE}
* Used for reading results when select destination is {@link org.apache.druid.msq.indexing.destination.MSQSelectDestination#DURABLESTORAGE}
*/
public class DurableStorageQueryResultsInputChannelFactory extends DurableStorageInputChannelFactory
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -233,17 +234,18 @@ public QueryResponse<Object[]> runQuery(final DruidQuery druidQuery)
);
} else {
final MSQSelectDestination msqSelectDestination = MultiStageQueryContext.getSelectDestination(sqlQueryContext);
if (msqSelectDestination.equals(MSQSelectDestination.TASK_REPORT)) {
if (msqSelectDestination.equals(MSQSelectDestination.TASKREPORT)) {
destination = TaskReportMSQDestination.instance();
} else if (msqSelectDestination.equals(MSQSelectDestination.DURABLE_STORAGE)) {
} else if (msqSelectDestination.equals(MSQSelectDestination.DURABLESTORAGE)) {
destination = DurableStorageMSQDestination.instance();
} else {
throw InvalidInput.exception(
"Unsupported select destination [%s] provided in the query context. MSQ can currently write the select results to "
+ "[%s] and [%s]",
msqSelectDestination.name(),
MSQSelectDestination.TASK_REPORT.toString(),
MSQSelectDestination.DURABLE_STORAGE.toString()
+ "[%s]",
msqSelectDestination.getName(),
Arrays.stream(MSQSelectDestination.values())
.map(MSQSelectDestination::getName)
.collect(Collectors.joining(","))
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;
Expand All @@ -32,43 +33,46 @@
*/
public class PageInformation
{
private final long id;
@Nullable
private final Long numRows;
@Nullable
private final Long sizeInBytes;
private final long id;

@JsonCreator
public PageInformation(
@JsonProperty("id") long id,
@JsonProperty("numRows") @Nullable Long numRows,
@JsonProperty("sizeInBytes") @Nullable Long sizeInBytes,
@JsonProperty("id") long id
@JsonProperty("sizeInBytes") @Nullable Long sizeInBytes
)
{
this.id = id;
this.numRows = numRows;
this.sizeInBytes = sizeInBytes;
this.id = id;
}

@JsonProperty
public long getId()
{
return id;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public Long getNumRows()
{
return numRows;
}

@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
public Long getSizeInBytes()
{
return sizeInBytes;
}

@JsonProperty
public long getId()
{
return id;
}

@Override
public boolean equals(Object o)
Expand All @@ -89,16 +93,16 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(numRows, sizeInBytes, id);
return Objects.hash(id, numRows, sizeInBytes);
}

@Override
public String toString()
{
return "PageInformation{" +
"numRows=" + numRows +
"id=" + id +
", numRows=" + numRows +
", sizeInBytes=" + sizeInBytes +
", id=" + id +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.client.indexing.TaskStatusResponse;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.ErrorResponse;
import org.apache.druid.error.Forbidden;
Expand Down Expand Up @@ -830,16 +831,16 @@ private void contextChecks(QueryContext queryContext)

if (executionMode == null) {
throw InvalidInput.exception(
"Execution mode is not provided to the SQL statement API. "
"Execution mode is not provided to the sql statement api. "
+ "Please set [%s] to [%s] in the query context",
QueryContexts.CTX_EXECUTION_MODE,
ExecutionMode.ASYNC
);
}

if (ExecutionMode.ASYNC != executionMode) {
if (!ExecutionMode.ASYNC.equals(executionMode)) {
throw InvalidInput.exception(
"The SQL statement API currently does not support the provided execution mode [%s]. "
"The sql statement api currently does not support the provided execution mode [%s]. "
+ "Please set [%s] to [%s] in the query context",
executionMode,
QueryContexts.CTX_EXECUTION_MODE,
Expand All @@ -848,7 +849,7 @@ private void contextChecks(QueryContext queryContext)
}

MSQSelectDestination selectDestination = MultiStageQueryContext.getSelectDestination(queryContext);
if (selectDestination == MSQSelectDestination.DURABLE_STORAGE) {
if (MSQSelectDestination.DURABLESTORAGE.equals(selectDestination)) {
checkForDurableStorageConnectorImpl();
}
}
Expand All @@ -860,12 +861,13 @@ private void checkForDurableStorageConnectorImpl()
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
StringUtils.format(
"The SQL Statement API cannot read from the select destination [%s] provided "
+ "in the query context [%s] since it is not configured. It is recommended to configure the durable storage "
"The sql statement api cannot read from the select destination [%s] provided "
+ "in the query context [%s] since it is not configured on the %s. It is recommended to configure durable storage "
+ "as it allows the user to fetch large result sets. Please contact your cluster admin to "
+ "configure durable storage.",
MSQSelectDestination.DURABLE_STORAGE.name(),
MultiStageQueryContext.CTX_SELECT_DESTINATION
MSQSelectDestination.DURABLESTORAGE.getName(),
MultiStageQueryContext.CTX_SELECT_DESTINATION,
NodeRole.BROKER.getJsonName()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
*
* <li><b>selectDestination</b>: If the query is a Select, determines the location to write results to, once the query
* is finished. Depending on the location, the results might also be truncated to {@link Limits#MAX_SELECT_RESULT_ROWS}.
* Default value is {@link MSQSelectDestination#TASK_REPORT}, which writes all the results to the report.
* Default value is {@link MSQSelectDestination#TASKREPORT}, which writes all the results to the report.
*
* <li><b>useAutoColumnSchemas</b>: Temporary flag to allow experimentation using
* {@link org.apache.druid.segment.AutoTypeColumnSchema} for all 'standard' type columns during segment generation,
Expand All @@ -93,7 +93,7 @@ public class MultiStageQueryContext
public static final String CTX_DURABLE_SHUFFLE_STORAGE = "durableShuffleStorage";
private static final boolean DEFAULT_DURABLE_SHUFFLE_STORAGE = false;
public static final String CTX_SELECT_DESTINATION = "selectDestination";
private static final String DEFAULT_SELECT_DESTINATION = MSQSelectDestination.TASK_REPORT.toString();
private static final String DEFAULT_SELECT_DESTINATION = MSQSelectDestination.TASKREPORT.getName();

public static final String CTX_FAULT_TOLERANCE = "faultTolerance";
public static final boolean DEFAULT_FAULT_TOLERANCE = false;
Expand Down
Loading