Skip to content
24 changes: 12 additions & 12 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,19 @@ If you're using the web console, you can specify the context parameters through

The following table lists the context parameters for the MSQ task engine:

| Parameter | Description | Default value |
|---|---|---|
| `maxNumTasks` | SELECT, INSERT, REPLACE<br /><br />The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.<br /><br />May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority.| 2 |
| `taskAssignment` | SELECT, INSERT, REPLACE<br /><br />Determines how many tasks to use. Possible values include: <ul><li>`max`: Uses as many tasks as possible, up to `maxNumTasks`.</li><li>`auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 10 GiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.</li></ul> | `max` |
| `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true |
| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` |
| `rowsInMemory` | INSERT or REPLACE<br /><br />Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
| Parameter | Description | Default value |
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
| `maxNumTasks` | SELECT, INSERT, REPLACE<br /><br />The maximum total number of tasks to launch, including the controller task. The lowest possible value for this setting is 2: one controller and one worker. All tasks must be able to launch simultaneously. If they cannot, the query returns a `TaskStartTimeout` error code after approximately 10 minutes.<br /><br />May also be provided as `numTasks`. If both are present, `maxNumTasks` takes priority. | 2 |
| `taskAssignment` | SELECT, INSERT, REPLACE<br /><br />Determines how many tasks to use. Possible values include: <ul><li>`max`: Uses as many tasks as possible, up to `maxNumTasks`.</li><li>`auto`: When file sizes can be determined through directory listing (for example: local files, S3, GCS, HDFS) uses as few tasks as possible without exceeding 512 MiB or 10,000 files per task, unless exceeding these limits is necessary to stay within `maxNumTasks`. When calculating the size of files, the weighted size is used, which considers the file format and compression format used if any. When file sizes cannot be determined through directory listing (for example: http), behaves the same as `max`.</li></ul> | `max` |
| `finalizeAggregations` | SELECT, INSERT, REPLACE<br /><br />Determines the type of aggregation to return. If true, Druid finalizes the results of complex aggregations that directly appear in query results. If false, Druid returns the aggregation's intermediate type rather than finalized type. This parameter is useful during ingestion, where it enables storing sketches directly in Druid tables. For more information about aggregations, see [SQL aggregation functions](../querying/sql-aggregations.md). | true |
| `sqlJoinAlgorithm` | SELECT, INSERT, REPLACE<br /><br />Algorithm to use for JOIN. Use `broadcast` (the default) for broadcast hash join or `sortMerge` for sort-merge join. Affects all JOIN operations in the query. See [Joins](#joins) for more details. | `broadcast` |
| `rowsInMemory` | INSERT or REPLACE<br /><br />Maximum number of rows to store in memory at once before flushing to disk during the segment generation process. Ignored for non-INSERT queries. In most cases, use the default value. You may need to override the default if you run into one of the [known issues](./known-issues.md) around memory usage. | 100,000 |
| `segmentSortOrder` | INSERT or REPLACE<br /><br />Normally, Druid sorts rows in individual segments using `__time` first, followed by the [CLUSTERED BY](#clustered-by) clause. When you set `segmentSortOrder`, Druid sorts rows in segments using this column list first, followed by the CLUSTERED BY order.<br /><br />You provide the column list as comma-separated values or as a JSON array in string form. If your query includes `__time`, then this list must begin with `__time`. For example, consider an INSERT query that uses `CLUSTERED BY country` and has `segmentSortOrder` set to `__time,city`. Within each time chunk, Druid assigns rows to segments based on `country`, and then within each of those segments, Druid sorts those rows by `__time` first, then `city`, then `country`. | empty list |
| `maxParseExceptions`| SELECT, INSERT, REPLACE<br /><br />Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1.| 0 |
| `rowsPerSegment` | INSERT or REPLACE<br /><br />The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 |
| `indexSpec` | INSERT or REPLACE<br /><br />An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error. <br /><br /> | `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |
| `maxParseExceptions`| SELECT, INSERT, REPLACE<br /><br />Maximum number of parse exceptions that are ignored while executing the query before it stops with `TooManyWarningsFault`. To ignore all the parse exceptions, set the value to -1. | 0 |
| `rowsPerSegment` | INSERT or REPLACE<br /><br />The number of rows per segment to target. The actual number of rows per segment may be somewhat higher or lower than this number. In most cases, use the default. For general information about sizing rows per segment, see [Segment Size Optimization](../operations/segment-optimization.md). | 3,000,000 |
| `indexSpec` | INSERT or REPLACE<br /><br />An [`indexSpec`](../ingestion/ingestion-spec.md#indexspec) to use when generating segments. May be a JSON string or object. See [Front coding](../ingestion/ingestion-spec.md#front-coding) for details on configuring an `indexSpec` with front coding. | See [`indexSpec`](../ingestion/ingestion-spec.md#indexspec). |
| `durableShuffleStorage` | SELECT, INSERT, REPLACE <br /><br />Whether to use durable storage for shuffle mesh. To use this feature, configure the durable storage at the server level using `druid.msq.intermediate.storage.enable=true`). If these properties are not configured, any query with the context variable `durableShuffleStorage=true` fails with a configuration error. <br /><br /> | `false` |
| `faultTolerance` | SELECT, INSERT, REPLACE<br /><br /> Whether to turn on fault tolerance mode or not. Failed workers are retried based on [Limits](#limits). Cannot be used when `durableShuffleStorage` is explicitly set to false. | `false` |

## Joins

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

public class AvroOCFInputFormat extends NestedInputFormat
{
static final long SCALE_FACTOR = 8L;
private static final Logger LOGGER = new Logger(AvroOCFInputFormat.class);

private final boolean binaryAsString;
Expand Down Expand Up @@ -116,6 +117,12 @@ public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity
);
}

@Override
public long getWeightedSize(String path, long size)
{
return size * SCALE_FACTOR;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,21 @@ public void testSerdeNonDefaults() throws Exception

Assert.assertEquals(inputFormat, inputFormat2);
}

@Test
public void test_getWeightedSize_withoutCompression() throws Exception
{
AvroOCFInputFormat format = new AvroOCFInputFormat(
jsonMapper,
flattenSpec,
null,
false,
false
);
long unweightedSize = 100L;
Assert.assertEquals(
unweightedSize * AvroOCFInputFormat.SCALE_FACTOR,
format.getWeightedSize("file.avro", unweightedSize)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.storage.azure.AzureCloudBlobIterable;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
Expand Down Expand Up @@ -62,6 +65,14 @@ public class AzureInputSourceTest extends EasyMockSupport
private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new CloudObjectLocation(CONTAINER, BLOB_PATH);
private static final int MAX_LISTING_LENGTH = 10;

private static final InputFormat INPUT_FORMAT = new JsonInputFormat(
new JSONPathSpec(true, null),
null,
false,
null,
null
);

private AzureStorage storage;
private AzureEntityFactory entityFactory;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
Expand Down Expand Up @@ -165,7 +176,7 @@ public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLo
);

Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.createSplits(
null,
INPUT_FORMAT,
new MaxSizeSplitHintSpec(null, 1)
);

Expand Down Expand Up @@ -214,7 +225,7 @@ public void test_getPrefixesSplitStream_withObjectGlob_successfullyCreatesCloudL
);

Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.createSplits(
null,
INPUT_FORMAT,
new MaxSizeSplitHintSpec(null, 1)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class Limits
/**
* Maximum number of input bytes per worker in case number of tasks is determined automatically.
*/
public static final long DEFAULT_MAX_INPUT_BYTES_PER_WORKER = 10 * 1024 * 1024 * 1024L;
public static final long DEFAULT_MAX_INPUT_BYTES_PER_WORKER = 1024 * 1024 * 512L;

/**
* Maximum size of the kernel manipulation queue in {@link org.apache.druid.msq.indexing.MSQControllerTask}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public <T> Iterator<List<T>> split(
return Iterators.filter(
SlicerUtils.makeSlicesDynamic(
inputIterator,
item -> inputAttributeExtractor.apply(item).getSize(),
item -> inputAttributeExtractor.apply(item).getWeightedSize(),
maxNumSlices,
maxFilesPerSlice,
maxBytesPerSlice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,19 @@ public void test_sliceDynamic_splittable_needTwoDueToBytes()
);
}

@Test
public void test_sliceDynamic_splittableFilesWithCompression_needThreeDueToBytes()
{
Assert.assertEquals(
ImmutableList.of(
splittableSlice("foo.gz"),
splittableSlice("bar.gz"),
splittableSlice("baz.gz")
),
slicer.sliceDynamic(splittableSpec("foo.gz", "bar.gz", "baz.gz"), 100, 5, 7)
);
}

@Test
public void test_sliceDynamic_splittableThatIgnoresSplitHints_oneHundredMax()
{
Expand Down Expand Up @@ -367,7 +380,7 @@ public Stream<InputSplit<List<String>>> createSplits(
if (useSplitHintSpec) {
splits = splitHintSpec.split(
strings.iterator(),
s -> new InputFileAttribute(s.length())
s -> new InputFileAttribute(s.length(), INPUT_FORMAT.getWeightedSize(s, s.length()))
);
} else {
// Ignore splitHintSpec, return one element per split. Similar to HttpInputSource, for example.
Expand Down
Loading