Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/multi-stage-query/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ When deciding whether to use `REPLACE` or `INSERT`, keep in mind that segments g
with dimension-based pruning but those generated with `INSERT` cannot. For more information about the requirements
for dimension-based pruning, see [Clustering](#clustering).

### Write to an external destination with `EXTERN`

Query tasks can write data to an external destination through the `EXTERN` function, when it is used with the `INTO`
clause, such as `INSERT INTO EXTERN(...)`. The EXTERN function takes arguments that specify where to write the files.
The format can be specified using an `AS` clause.

For more information about the syntax, see [`EXTERN`](./reference.md#extern-function).

### Primary timestamp

Druid tables always include a primary timestamp named `__time`.
Expand Down
90 changes: 89 additions & 1 deletion docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ making it easy to reuse the same SQL statement for each ingest: just specify the

### `EXTERN` Function

Use the `EXTERN` function to read external data. The function has two variations.
Use the `EXTERN` function to read external data or write to an external location.

#### `EXTERN` as an input source

The function has two variations.
Function variation 1, with the input schema expressed as JSON:

```sql
Expand Down Expand Up @@ -90,6 +93,91 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.

For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).

#### `EXTERN` to export to a destination

`EXTERN` can be used to specify a destination where you want to export data to.
This variation of EXTERN requires one argument, the details of the destination as specified below.
This variation additionally requires an `AS` clause to specify the format of the exported rows.

Keep the following in mind when using EXTERN to export rows:
- Only INSERT statements are supported.
- Only `CSV` format is supported as an export format.
- Partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) aren't supported with export statements.
- You can export to Amazon S3 or local storage.
- The destination provided should contain no other files or directories.

When you export data, use the `rowsPerPage` context parameter to control how many rows get exported. The default is 100,000.

```sql
INSERT INTO
EXTERN(<destination function>)
AS CSV
SELECT
<column>
FROM <table>
```

##### S3

Export results to S3 by passing the function `S3()` as an argument to the `EXTERN` function. Note that this requires the `druid-s3-extensions`.
The `S3()` function is a Druid function that configures the connection. Arguments for `S3()` should be passed as named parameters with the value in single quotes like the following example:

```sql
INSERT INTO
EXTERN(
S3(bucket => 'your_bucket', prefix => 'prefix/to/files')
)
AS CSV
SELECT
<column>
FROM <table>
```

Supported arguments for the function:

| Parameter | Required | Description | Default |
|-------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `bucket` | Yes | The S3 bucket to which the files are exported to. The bucket and prefix combination should be whitelisted in `druid.export.storage.s3.allowedExportPaths`. | n/a |
| `prefix` | Yes | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. The bucket and prefix combination should be whitelisted in `druid.export.storage.s3.allowedExportPaths`. | n/a |

The following runtime parameters must be configured to export into an S3 destination:

| Runtime Parameter | Required | Description | Default |
|----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
| `druid.export.storage.s3.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a |
| `druid.export.storage.s3.allowedExportPaths` | Yes | An array of S3 prefixes that are whitelisted as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]` | n/a |
| `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |

##### LOCAL

You can export to the local storage, which exports the results to the filesystem of the MSQ worker.
This is useful in a single node setup or for testing but is not suitable for production use cases.

Export results to local storage by passing the function `LOCAL()` as an argument for the `EXTERN FUNCTION`.
To use local storage as an export destination, the runtime property `druid.export.storage.baseDir` must be configured on the Indexer/Middle Manager.
This value must be set to an absolute path on the local machine. Exporting data will be allowed to paths which match the prefix set by this value.
Arguments to `LOCAL()` should be passed as named parameters with the value in single quotes in the following example:

```sql
INSERT INTO
EXTERN(
local(exportPath => 'exportLocation/query1')
)
AS CSV
SELECT
<column>
FROM <table>
```

Supported arguments to the function:

| Parameter | Required | Description | Default |
|-------------|--------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| --|
| `exportPath` | Yes | Absolute path to a subdirectory of `druid.export.storage.baseDir` used as the destination to export the results to. The export query expects the destination to be empty. If the location includes other files or directories, then the query will fail. | n/a |

For more information, see [Read external data with EXTERN](concepts.md#write-to-an-external-destination-with-extern).

### `INSERT`

Use the `INSERT` statement to insert data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,4 @@ public StorageConnector get()
{
return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.ExportMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.error.CanceledFault;
Expand Down Expand Up @@ -165,9 +166,9 @@
import org.apache.druid.msq.querykit.MultiQueryKit;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.results.ExportResultsFrameProcessorFactory;
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
import org.apache.druid.msq.querykit.scan.ScanQueryKit;
import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory;
Expand Down Expand Up @@ -201,6 +202,8 @@
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
Expand All @@ -220,6 +223,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1756,7 +1760,8 @@ private static QueryDefinition makeQueryDefinition(
final ShuffleSpecFactory shuffleSpecFactory;

if (MSQControllerTask.isIngestion(querySpec)) {
shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(tuningConfig.getRowsPerSegment());
shuffleSpecFactory = querySpec.getDestination()
.getShuffleSpecFactory(tuningConfig.getRowsPerSegment());

if (!columnMappings.hasUniqueOutputColumnNames()) {
// We do not expect to hit this case in production, because the SQL validator checks that column names
Expand All @@ -1777,16 +1782,10 @@ private static QueryDefinition makeQueryDefinition(
} else {
queryToPlan = querySpec.getQuery();
}
} else if (querySpec.getDestination() instanceof TaskReportMSQDestination) {
shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
queryToPlan = querySpec.getQuery();
} else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {
shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
);
queryToPlan = querySpec.getQuery();
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
shuffleSpecFactory = querySpec.getDestination()
.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()));
queryToPlan = querySpec.getQuery();
}

final QueryDefinition queryDef;
Expand Down Expand Up @@ -1877,6 +1876,43 @@ private static QueryDefinition makeQueryDefinition(
} else {
return queryDef;
}
} else if (querySpec.getDestination() instanceof ExportMSQDestination) {
final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
final ExportStorageProvider exportStorageProvider = exportMSQDestination.getExportStorageProvider();

try {
// Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export.
Iterator<String> filesIterator = exportStorageProvider.get().listDir("");
if (filesIterator.hasNext()) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Found files at provided export destination[%s]. Export is only allowed to "
+ "an empty path. Please provide an empty path/subdirectory or move the existing files.",
exportStorageProvider.getBasePath());
}
}
catch (IOException e) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception occurred while connecting to export destination.");
}


final ResultFormat resultFormat = exportMSQDestination.getResultFormat();
final QueryDefinitionBuilder builder = QueryDefinition.builder();
builder.addAll(queryDef);
builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.signature(queryDef.getFinalStageDefinition().getSignature())
.shuffleSpec(null)
.processorFactory(new ExportResultsFrameProcessorFactory(
queryId,
exportStorageProvider,
resultFormat
))
);
return builder.build();
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.querykit.groupby.GroupByPostShuffleFrameProcessorFactory;
import org.apache.druid.msq.querykit.groupby.GroupByPreShuffleFrameProcessorFactory;
import org.apache.druid.msq.querykit.results.ExportResultsFrameProcessorFactory;
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessorFactory;
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
Expand Down Expand Up @@ -158,6 +159,7 @@ public List<? extends Module> getJacksonModules()
NilExtraInfoHolder.class,
SortMergeJoinFrameProcessorFactory.class,
QueryResultFrameProcessorFactory.class,
ExportResultsFrameProcessorFactory.class,

// DataSource classes (note: ExternalDataSource is in MSQSqlModule)
InputNumberDataSource.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.run.SqlResults;
import org.joda.time.Interval;
Expand All @@ -65,6 +66,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

@JsonTypeName(MSQControllerTask.TYPE)
Expand Down Expand Up @@ -273,6 +275,12 @@ private static String getDataSourceForTaskMetadata(final MSQSpec querySpec)
}
}

@Override
public Optional<Resource> getDestinationResource()
{
return querySpec.getDestination().getDestinationResource();
}

public static boolean isIngestion(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof DataSourceMSQDestination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceType;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class DataSourceMSQDestination implements MSQDestination
{
Expand Down Expand Up @@ -160,4 +165,16 @@ public String toString()
", replaceTimeChunks=" + replaceTimeChunks +
'}';
}

@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
}

@Override
public Optional<Resource> getDestinationResource()
{
return Optional.of(new Resource(getDataSource(), ResourceType.DATASOURCE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
package org.apache.druid.msq.indexing.destination;

import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceType;

import java.util.Optional;

public class DurableStorageMSQDestination implements MSQDestination
{
Expand All @@ -45,4 +52,15 @@ public String toString()
return "DurableStorageDestination{}";
}

@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
}

@Override
public Optional<Resource> getDestinationResource()
{
return Optional.of(new Resource(MSQControllerTask.DUMMY_DATASOURCE_FOR_SELECT, ResourceType.DATASOURCE));
}
}
Loading