Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/content/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)|
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)|
|`reportParseExceptions`|Boolean|*DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)|
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer sequence numbers if the next sequence number that it is trying to fetch is less than the earliest available sequence number for that particular shard. The sequence number will be reset to either the earliest or latest sequence number depending on `useEarliestOffset` property of `KinesisSupervisorIOConfig` (see below). This situation typically occurs when messages in Kinesis are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular shard will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|
Expand Down
3 changes: 2 additions & 1 deletion docs/content/ingestion/hadoop.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|combineText|Boolean|Use CombineTextInputFormat to combine multiple files into a file split. This can speed up Hadoop jobs when processing a large number of small files.|no (default == false)|
|useCombiner|Boolean|Use Hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)|
|indexSpec|Object|Tune how data is indexed. See below for more information.|no|
|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)|
|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
|forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitioning-specification). This option can be useful when you need to append more data to existing dataSource.|no (default = false)|
|useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)|
Expand Down
10 changes: 10 additions & 0 deletions docs/content/ingestion/native_tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no|
Expand Down Expand Up @@ -375,6 +376,14 @@ An example of the result is
"metricCompression": "lz4",
"longEncoding": "longs"
},
"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "concise"
},
"dimensionCompression": "lz4",
"metricCompression": "lz4",
"longEncoding": "longs"
},
"maxPendingPersists": 0,
"reportParseExceptions": false,
"pushTimeout": 0,
Expand Down Expand Up @@ -555,6 +564,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no|
|partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.html#roll-up-modes). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. You can set `forceExtendableShardSpecs` if you plan to append more data to the same time range in the future. This flag cannot be used with `appendToExisting` of IOConfig. For more details, see the below __Segment pushing modes__ section.|false|no|
|reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public HadoopIndexTask createTask(Interval interval, String version, List<DataSe
tuningConfig.getPartitionsSpec(),
tuningConfig.getShardSpecs(),
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
tuningConfig.getRowFlushBoundary(),
tuningConfig.getMaxBytesInMemory(),
tuningConfig.isLeaveIntermediate(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public KafkaIndexTaskTuningConfig(
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly,
@Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
Expand All @@ -62,6 +63,7 @@ public KafkaIndexTaskTuningConfig(
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
true,
reportParseExceptions,
handoffConditionTimeout,
Expand All @@ -87,6 +89,7 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
dir,
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
true,
isReportParseExceptions(),
getHandoffConditionTimeout(),
Expand All @@ -112,6 +115,7 @@ public String toString()
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public KafkaSupervisorSpec(
null,
null,
null,
null,
null
),
ioConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
Expand Down Expand Up @@ -80,6 +81,7 @@ public KafkaSupervisorTuningConfig(
basePersistDirectory,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
true,
reportParseExceptions,
handoffConditionTimeout,
Expand Down Expand Up @@ -186,6 +188,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
getBasePersistDirectory(),
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
true,
isReportParseExceptions(),
getHandoffConditionTimeout(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2456,6 +2456,7 @@ private KafkaIndexTask createTask(
null,
null,
null,
null,
true,
reportParseExceptions,
handoffConditionTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.indexing.TuningConfig;
import org.joda.time.Period;
import org.junit.Assert;
Expand Down Expand Up @@ -65,6 +66,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
}
Expand All @@ -81,7 +83,9 @@ public void testSerdeWithNonDefaults() throws Exception
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"reportParseExceptions\": true,\n"
+ " \"handoffConditionTimeout\": 100\n"
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n"
+ "}";

KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
Expand All @@ -103,6 +107,8 @@ public void testSerdeWithNonDefaults() throws Exception
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertEquals(true, config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec());
Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists());
}

@Test
Expand All @@ -117,6 +123,7 @@ public void testConvert()
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
Expand Down Expand Up @@ -159,6 +166,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
Expand Down Expand Up @@ -206,6 +214,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
true,
true,
5L,
Expand Down Expand Up @@ -252,6 +261,7 @@ private static KafkaIndexTaskTuningConfig copy(KafkaIndexTaskTuningConfig config
config.getBasePersistDirectory(),
0,
config.getIndexSpec(),
config.getIndexSpecForIntermediatePersists(),
true,
config.isReportParseExceptions(),
config.getHandoffConditionTimeout(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ public void setupTest()
new File("/test"),
null,
null,
null,
true,
false,
null,
Expand Down Expand Up @@ -3004,6 +3005,7 @@ public void testIsTaskCurrent()
new File("/test"),
null,
null,
null,
true,
false,
null,
Expand Down
Loading