Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)|

### Task Autoscaler Properties

Expand Down
110 changes: 110 additions & 0 deletions docs/development/extensions-core/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,116 @@ A sample supervisor spec is shown below:
|`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|no|
|`awsExternalId`|String|The AWS external id to use for additional permissions.|no|
|`deaggregate`|Boolean|Whether to use the de-aggregate function of the KCL. See below for details.|no|
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kinesis ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)|

### Task Autoscaler Properties

> Note that Task AutoScaler is currently designated as experimental.

| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enableTaskAutoScaler` | Enable or disable the auto scaler. When false or or absent Druid disables the `autoScaler` even when `autoScalerConfig` is not null| no (default == false) |
| `taskCountMax` | Maximum number of Kinesis ingestion tasks. Must be greater than or equal to `taskCountMin`. If greater than `{numKinesisShards}`, the maximum number of reading tasks is `{numKinesisShards}` and `taskCountMax` is ignored. | yes |
| `taskCountMin` | Minimum number of Kinesis ingestion tasks. When you enable the auto scaler, Druid ignores the value of taskCount in `IOConfig` and uses`taskCountMin` for the initial number of tasks to launch.| yes |
| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |

### Lag Based AutoScaler Strategy Related Properties

The Kinesis indexing service reports lag metrics measured in time milliseconds rather than message count which is used by Kafka.

| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |
| `lagCollectionIntervalMillis` | Time period between lag points collection. | no (default == 30000) |

Not sure if this is the time period between collections or if it relates to some sort of time lag.

| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that when the time lage reaches 6000000 (default) the autoscaler launches another task?

| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are lag points defined somewhere? Maybe we need an example of how this works together with the scaleOutThreshold

| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments as for scale out

| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
| `scaleActionStartDelayMillis` | Number of milliseconds to delay after the supervisor starts before the first scale logic check. | no (default == 300000) |
| `scaleActionPeriodMillis` | Frequency in milliseconds to check if a scale action is triggered | no (default == 60000) |
| `scaleInStep` | Number of tasks to reduce at a time when scaling down | no (default == 1) |
| `scaleOutStep` | Number of tasks to add at a time when scaling out | no (default == 2) |

The following example demonstrates a supervisor spec with `lagBased` autoScaler enabled:
```json
{
"type": "kinesis",
"dataSchema": {
"dataSource": "metrics-kinesis",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"stream": "metrics",
"autoScalerConfig": {
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
"minTriggerScaleActionFrequencyMillis": 600000,
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
"scaleOutThreshold": 600000,
"triggerScaleOutFractionThreshold": 0.3,
"scaleInThreshold": 100000,
"triggerScaleInFractionThreshold": 0.9,
"scaleActionStartDelayMillis": 300000,
"scaleActionPeriodMillis": 60000,
"scaleInStep": 1,
"scaleOutStep": 2
},
"inputFormat": {
"type": "json"
},
"endpoint": "kinesis.us-east-1.amazonaws.com",
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"recordsPerFetch": 2000,
"fetchDelayMillis": 1000
},
"tuningConfig": {
"type": "kinesis",
"maxRowsPerSegment": 5000000
}
}
```

#### Specifying data format

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,19 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
return true;
}

// not yet supported, will be implemented in the future maybe? need to find a proper way to measure kinesis lag.
// Unlike the Kafka Indexing Service,
// Kinesis reports lag metrics measured in time difference in milliseconds between the current sequence number and latest sequence number,
// rather than message count.
@Override
public LagStats computeLagStats()
{
throw new UnsupportedOperationException("Compute Lag Stats is not supported in KinesisSupervisor yet.");
Map<String, Long> partitionTimeLags = getPartitionTimeLag();

if (partitionTimeLags == null) {
return new LagStats(0, 0, 0);
}

return computeLags(partitionTimeLags);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ public KinesisSupervisorIOConfig(
lateMessageRejectionStartDateTime
);

// for now dynamic Allocation Tasks is not supported here
// throw UnsupportedOperationException in case someone sets this on a kinesis supervisor spec.
if (autoScalerConfig != null) {
throw new UnsupportedOperationException("Tasks auto scaler for kinesis is not supported yet. Please remove autoScalerConfig or set it to null!");
}

this.endpoint = endpoint != null
? endpoint
: (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint());
Expand Down Expand Up @@ -157,6 +151,7 @@ public String toString()
", endpoint='" + endpoint + '\'' +
", replicas=" + getReplicas() +
", taskCount=" + getTaskCount() +
", autoScalerConfig=" + getAutoscalerConfig() +
", taskDuration=" + getTaskDuration() +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +
Expand Down
Loading