Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
51639bc
Add test
adarshsanjeev Jan 10, 2024
158ba0c
Parser changes to support export statements
adarshsanjeev Jan 15, 2024
9e1ef17
Fix builds
adarshsanjeev Jan 16, 2024
06e9d92
Address comments
adarshsanjeev Jan 17, 2024
6ff6747
Add frame processor
adarshsanjeev Jan 21, 2024
53ff841
Address review comments
adarshsanjeev Jan 22, 2024
6daf530
Fix builds
adarshsanjeev Jan 22, 2024
550cc8f
Update syntax
adarshsanjeev Jan 23, 2024
9ab7b37
Webconsole workaround
adarshsanjeev Jan 23, 2024
5fd7ded
Refactor
adarshsanjeev Jan 24, 2024
e6c75ab
Refactor
adarshsanjeev Jan 24, 2024
4c9d4cc
Change export file path
adarshsanjeev Jan 24, 2024
529d14b
Merge remote-tracking branch 'origin/master' into export-syntax
adarshsanjeev Jan 24, 2024
58f1d13
Update docs
adarshsanjeev Jan 25, 2024
3db7a1b
Remove webconsole changes
adarshsanjeev Jan 25, 2024
f3ebe05
Fix spelling mistake
adarshsanjeev Jan 25, 2024
c45c357
Parser changes, add tests
adarshsanjeev Jan 25, 2024
8ed422c
Parser changes, resolve build warnings
adarshsanjeev Jan 25, 2024
6e7262d
Fix failing test
adarshsanjeev Jan 25, 2024
f75188e
Fix failing test
adarshsanjeev Jan 25, 2024
e571a0a
Fix IT tests
adarshsanjeev Jan 26, 2024
1247461
Add tests
adarshsanjeev Jan 26, 2024
d5d3693
Cleanup
adarshsanjeev Jan 26, 2024
55a4aed
Fix unparse
adarshsanjeev Jan 27, 2024
0063767
Fix forbidden API
adarshsanjeev Jan 27, 2024
7cbdef1
Merge remote-tracking branch 'origin/master' into export-syntax
adarshsanjeev Jan 29, 2024
6f46c41
Update docs
adarshsanjeev Jan 29, 2024
3f8d715
Update docs
adarshsanjeev Jan 29, 2024
7c00062
Address review comments
adarshsanjeev Jan 30, 2024
6e9f53b
Address review comments
adarshsanjeev Jan 30, 2024
9bbda77
Fix tests
adarshsanjeev Jan 30, 2024
20103ca
Address review comments
adarshsanjeev Jan 30, 2024
aa3ce05
Fix insert unparse
adarshsanjeev Jan 31, 2024
0501106
Add external write resource action
adarshsanjeev Jan 31, 2024
ebfc53e
Fix tests
adarshsanjeev Jan 31, 2024
941605b
Add resource check to overlord resource
adarshsanjeev Jan 31, 2024
eb73cc2
Fix tests
adarshsanjeev Jan 31, 2024
1867cce
Add IT
adarshsanjeev Feb 1, 2024
9ff0cd4
Update syntax
adarshsanjeev Feb 2, 2024
8e21576
Update tests
adarshsanjeev Feb 2, 2024
62c2c04
Update permission
adarshsanjeev Feb 2, 2024
2c9e87b
Address review comments
adarshsanjeev Feb 3, 2024
c79c496
Address review comments
adarshsanjeev Feb 5, 2024
81ee2a3
Address review comments
adarshsanjeev Feb 5, 2024
f9873a6
Add tests
adarshsanjeev Feb 5, 2024
b4a2223
Merge remote-tracking branch 'origin/master' into export-syntax
adarshsanjeev Feb 5, 2024
c7f8234
Add check for runtime parameter for bucket and path
adarshsanjeev Feb 6, 2024
cf15323
Add check for runtime parameter for bucket and path
adarshsanjeev Feb 6, 2024
2ff3410
Add tests
adarshsanjeev Feb 6, 2024
c71cc5a
Update docs
adarshsanjeev Feb 7, 2024
180f132
Fix NPE
adarshsanjeev Feb 7, 2024
5206e90
Update docs, remove deadcode
adarshsanjeev Feb 7, 2024
10217a5
Fix formatting
adarshsanjeev Feb 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/multi-stage-query/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,14 @@ When deciding whether to use `REPLACE` or `INSERT`, keep in mind that segments g
with dimension-based pruning but those generated with `INSERT` cannot. For more information about the requirements
for dimension-based pruning, see [Clustering](#clustering).

### Write to an external destination with `EXTERN`

Query tasks can write data to an external destination through the `EXTERN` function, when it is used with the `INTO`
clause, such as `INSERT INTO EXTERN(...)`. The EXTERN function takes arguments that specify where to write the files.
The format can be specified using an `AS` clause.

For more information about the syntax, see [`EXTERN`](./reference.md#extern-function).

### Primary timestamp

Druid tables always include a primary timestamp named `__time`.
Expand Down
90 changes: 89 additions & 1 deletion docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ making it easy to reuse the same SQL statement for each ingest: just specify the

### `EXTERN` Function

Use the `EXTERN` function to read external data. The function has two variations.
Use the `EXTERN` function to read external data or write to an external location.

#### `EXTERN` as an input source

The function has two variations.
Function variation 1, with the input schema expressed as JSON:

```sql
Expand Down Expand Up @@ -90,6 +93,91 @@ can precede the column list: `EXTEND (timestamp VARCHAR...)`.

For more information, see [Read external data with EXTERN](concepts.md#read-external-data-with-extern).

#### `EXTERN` to export to a destination

`EXTERN` can be used to specify a destination where you want to export data to.
This variation of EXTERN requires one argument, the details of the destination as specified below.
This variation additionally requires an `AS` clause to specify the format of the exported rows.
Comment on lines +99 to +100
Copy link
Copy Markdown
Contributor

@317brian 317brian Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This variation of EXTERN requires one argument, the details of the destination as specified below.
This variation additionally requires an `AS` clause to specify the format of the exported rows.
This variation of EXTERN has two required parts: an argument that details the destination and an `AS` clause to specify the format of the exported rows.

Copy link
Copy Markdown
Contributor Author

@adarshsanjeev adarshsanjeev Feb 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AS clause would not be an argument to extern, it's present elsewhere in the query. Would it be confusing to call it an argument?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the change I just made?


Keep the following in mind when using EXTERN to export rows:
- Only INSERT statements are supported.
- Only `CSV` format is supported as an export format.
- Partitioning (`PARTITIONED BY`) and clustering (`CLUSTERED BY`) aren't supported with export statements.
- You can export to Amazon S3 or local storage.
- The destination provided should contain no other files or directories.

When you export data, use the `rowsPerPage` context parameter to control how many rows get exported. The default is 100,000.

```sql
INSERT INTO
EXTERN(<destination function>)
AS CSV
SELECT
<column>
FROM <table>
```

##### S3

Export results to S3 by passing the function `S3()` as an argument to the `EXTERN` function. Note that this requires the `druid-s3-extensions`.
The `S3()` function is a Druid function that configures the connection. Arguments for `S3()` should be passed as named parameters with the value in single quotes like the following example:

```sql
INSERT INTO
EXTERN(
S3(bucket => 'your_bucket', prefix => 'prefix/to/files')
)
AS CSV
SELECT
<column>
FROM <table>
```

Supported arguments for the function:

| Parameter | Required | Description | Default |
|-------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|
| `bucket` | Yes | The S3 bucket to which the files are exported to. The bucket and prefix combination should be whitelisted in `druid.export.storage.s3.allowedExportPaths`. | n/a |
| `prefix` | Yes | Path where the exported files would be created. The export query expects the destination to be empty. If the location includes other files, then the query will fail. The bucket and prefix combination should be whitelisted in `druid.export.storage.s3.allowedExportPaths`. | n/a |

The following runtime parameters must be configured to export into an S3 destination:

| Runtime Parameter | Required | Description | Default |
|----------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----|
| `druid.export.storage.s3.tempLocalDir` | Yes | Directory used on the local storage of the worker to store temporary files required while uploading the data. | n/a |
| `druid.export.storage.s3.allowedExportPaths` | Yes | An array of S3 prefixes that are whitelisted as export destinations. Export queries fail if the export destination does not match any of the configured prefixes. Example: `[\"s3://bucket1/export/\", \"s3://bucket2/export/\"]` | n/a |
| `druid.export.storage.s3.maxRetry` | No | Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. | 10 |
| `druid.export.storage.s3.chunkSize` | No | Defines the size of each chunk to temporarily store in `tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls to S3, however it requires more disk space to store the temporary chunks. | 100MiB |

##### LOCAL

You can export to the local storage, which exports the results to the filesystem of the MSQ worker.
This is useful in a single node setup or for testing but is not suitable for production use cases.

Export results to local storage by passing the function `LOCAL()` as an argument for the `EXTERN FUNCTION`.
To use local storage as an export destination, the runtime property `druid.export.storage.baseDir` must be configured on the Indexer/Middle Manager.
This value must be set to an absolute path on the local machine. Exporting data will be allowed to paths which match the prefix set by this value.
Arguments to `LOCAL()` should be passed as named parameters with the value in single quotes in the following example:

```sql
INSERT INTO
EXTERN(
local(exportPath => 'exportLocation/query1')
)
AS CSV
SELECT
<column>
FROM <table>
```

Supported arguments to the function:

| Parameter | Required | Description | Default |
|-------------|--------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| --|
| `exportPath` | Yes | Absolute path to a subdirectory of `druid.export.storage.baseDir` used as the destination to export the results to. The export query expects the destination to be empty. If the location includes other files or directories, then the query will fail. | n/a |

For more information, see [Read external data with EXTERN](concepts.md#write-to-an-external-destination-with-extern).

### `INSERT`

Use the `INSERT` statement to insert data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,4 @@ public StorageConnector get()
{
return new GoogleStorageConnector(this, googleStorage, googleInputDataConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.druid.msq.indexing.client.ControllerChatHandler;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.indexing.destination.DurableStorageMSQDestination;
import org.apache.druid.msq.indexing.destination.ExportMSQDestination;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
import org.apache.druid.msq.indexing.destination.TaskReportMSQDestination;
import org.apache.druid.msq.indexing.error.CanceledFault;
Expand Down Expand Up @@ -165,9 +166,9 @@
import org.apache.druid.msq.querykit.MultiQueryKit;
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.results.ExportResultsFrameProcessorFactory;
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
import org.apache.druid.msq.querykit.scan.ScanQueryKit;
import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory;
Expand Down Expand Up @@ -201,6 +202,8 @@
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.calcite.rel.DruidQuery;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
Expand All @@ -220,6 +223,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1756,7 +1760,8 @@ private static QueryDefinition makeQueryDefinition(
final ShuffleSpecFactory shuffleSpecFactory;

if (MSQControllerTask.isIngestion(querySpec)) {
shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(tuningConfig.getRowsPerSegment());
shuffleSpecFactory = querySpec.getDestination()
.getShuffleSpecFactory(tuningConfig.getRowsPerSegment());

if (!columnMappings.hasUniqueOutputColumnNames()) {
// We do not expect to hit this case in production, because the SQL validator checks that column names
Expand All @@ -1777,16 +1782,10 @@ private static QueryDefinition makeQueryDefinition(
} else {
queryToPlan = querySpec.getQuery();
}
} else if (querySpec.getDestination() instanceof TaskReportMSQDestination) {
shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
queryToPlan = querySpec.getQuery();
} else if (querySpec.getDestination() instanceof DurableStorageMSQDestination) {
shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
);
queryToPlan = querySpec.getQuery();
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
shuffleSpecFactory = querySpec.getDestination()
.getShuffleSpecFactory(MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the refactor. Its much cleaner now.
We should add a comment saying all select partitions are controlled by a context value rowsPerPage.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean a comment every where the function is being called? We don't pass the whole context to getShuffleSpecFactory(), just the integer, so would this need to be specifically mentioned somewhere?

queryToPlan = querySpec.getQuery();
}

final QueryDefinition queryDef;
Expand Down Expand Up @@ -1877,6 +1876,43 @@ private static QueryDefinition makeQueryDefinition(
} else {
return queryDef;
}
} else if (querySpec.getDestination() instanceof ExportMSQDestination) {
final ExportMSQDestination exportMSQDestination = (ExportMSQDestination) querySpec.getDestination();
final ExportStorageProvider exportStorageProvider = exportMSQDestination.getExportStorageProvider();

try {
// Check that the export destination is empty as a sanity check. We want to avoid modifying any other files with export.
Iterator<String> filesIterator = exportStorageProvider.get().listDir("");
if (filesIterator.hasNext()) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build("Found files at provided export destination[%s]. Export is only allowed to "
+ "an empty path. Please provide an empty path/subdirectory or move the existing files.",
exportStorageProvider.getBasePath());
}
}
catch (IOException e) {
throw DruidException.forPersona(DruidException.Persona.USER)
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
.build(e, "Exception occurred while connecting to export destination.");
}


final ResultFormat resultFormat = exportMSQDestination.getResultFormat();
final QueryDefinitionBuilder builder = QueryDefinition.builder();
builder.addAll(queryDef);
builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
.signature(queryDef.getFinalStageDefinition().getSignature())
.shuffleSpec(null)
.processorFactory(new ExportResultsFrameProcessorFactory(
queryId,
exportStorageProvider,
resultFormat
))
);
return builder.build();
} else {
throw new ISE("Unsupported destination [%s]", querySpec.getDestination());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.querykit.groupby.GroupByPostShuffleFrameProcessorFactory;
import org.apache.druid.msq.querykit.groupby.GroupByPreShuffleFrameProcessorFactory;
import org.apache.druid.msq.querykit.results.ExportResultsFrameProcessorFactory;
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
import org.apache.druid.msq.querykit.scan.ScanQueryFrameProcessorFactory;
import org.apache.druid.msq.util.PassthroughAggregatorFactory;
Expand Down Expand Up @@ -158,6 +159,7 @@ public List<? extends Module> getJacksonModules()
NilExtraInfoHolder.class,
SortMergeJoinFrameProcessorFactory.class,
QueryResultFrameProcessorFactory.class,
ExportResultsFrameProcessorFactory.class,

// DataSource classes (note: ExternalDataSource is in MSQSqlModule)
InputNumberDataSource.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.sql.calcite.run.SqlResults;
import org.joda.time.Interval;
Expand All @@ -65,6 +66,7 @@
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

@JsonTypeName(MSQControllerTask.TYPE)
Expand Down Expand Up @@ -273,6 +275,12 @@ private static String getDataSourceForTaskMetadata(final MSQSpec querySpec)
}
}

@Override
public Optional<Resource> getDestinationResource()
{
return querySpec.getDestination().getDestinationResource();
}

public static boolean isIngestion(final MSQSpec querySpec)
{
return querySpec.getDestination() instanceof DataSourceMSQDestination;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,17 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceType;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class DataSourceMSQDestination implements MSQDestination
{
Expand Down Expand Up @@ -160,4 +165,16 @@ public String toString()
", replaceTimeChunks=" + replaceTimeChunks +
'}';
}

@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
}

@Override
public Optional<Resource> getDestinationResource()
{
return Optional.of(new Resource(getDataSource(), ResourceType.DATASOURCE));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
package org.apache.druid.msq.indexing.destination;

import com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.druid.msq.indexing.MSQControllerTask;
import org.apache.druid.msq.querykit.ShuffleSpecFactories;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceType;

import java.util.Optional;

public class DurableStorageMSQDestination implements MSQDestination
{
Expand All @@ -45,4 +52,15 @@ public String toString()
return "DurableStorageDestination{}";
}

@Override
public ShuffleSpecFactory getShuffleSpecFactory(int targetSize)
{
return ShuffleSpecFactories.getGlobalSortWithTargetSize(targetSize);
}

@Override
public Optional<Resource> getDestinationResource()
{
return Optional.of(new Resource(MSQControllerTask.DUMMY_DATASOURCE_FOR_SELECT, ResourceType.DATASOURCE));
}
}
Loading