Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
f3d6422
druid task auto scale based on kafka lag
Oct 20, 2020
5c1c21c
fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig
Oct 21, 2020
6d7582b
druid task auto scale based on kafka lag
Oct 20, 2020
16b0744
fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig
Oct 21, 2020
07eb9c0
test dynamic auto scale done
Oct 21, 2020
a041b44
auto scale tasks tested on prd cluster
Oct 21, 2020
746b033
auto scale tasks tested on prd cluster
Oct 21, 2020
d25f94a
auto scale tasks tested on prd cluster
Oct 21, 2020
e7a1af1
modify code style to solve 29055.10 29055.9 29055.17 29055.18 29055.1…
Oct 22, 2020
d53ea76
Merge branch 'master' into kafka-dynamic-scale-ingest-tasks
Oct 22, 2020
78cbd45
rename test fiel function
Oct 23, 2020
e8b7e09
Merge branch 'master' into kafka-dynamic-scale-ingest-tasks
Nov 27, 2020
215844e
change codes and add docs based on capistrant reviewed
Nov 27, 2020
b3b75b2
midify test docs
Nov 27, 2020
1837547
modify docs
Nov 27, 2020
50a94ca
modify docs
Nov 27, 2020
4a0d706
modify docs
Nov 28, 2020
aa70a5c
merge from master
Jan 15, 2021
fb70688
merge from master
Jan 15, 2021
c0b3ff2
Extract the autoScale logic out of SeekableStreamSupervisor to minimi…
Jan 15, 2021
76db5ba
fix ci failed
Jan 16, 2021
751175f
revert msic.xml
Jan 16, 2021
f8a6707
add uts to test autoscaler create && scale out/in and kafka ingest wi…
Jan 16, 2021
172cff7
add more uts
Jan 17, 2021
57811be
fix inner class check
Jan 17, 2021
ff8105c
add IT for kafka ingestion with autoscaler
Jan 26, 2021
05571f7
add new IT in groups=kafka-index named testKafkaIndexDataWithWithAuto…
Jan 26, 2021
f09d3d5
Merge branch 'master' into kafka-dynamic-scale-ingest-tasks
Jan 26, 2021
e66d5d2
Merge branch 'master' into kafka-dynamic-scale-ingest-tasks
Feb 2, 2021
87a694a
review change
Feb 2, 2021
71bdfbb
code review
Feb 2, 2021
9602575
remove unused imports
Feb 2, 2021
6bbbf29
fix NLP
Feb 2, 2021
0ae6a34
fix docs and UTs
Feb 3, 2021
16e4f47
revert misc.xml
Feb 3, 2021
25fec0f
use jackson to build autoScaleConfig with default values
Feb 3, 2021
f0c8d78
add uts
Feb 3, 2021
0733590
use jackson to init AutoScalerConfig in IOConfig instead of Map<>
Feb 3, 2021
9726902
autoscalerConfig interface and provide a defaultAutoScalerConfig
Feb 4, 2021
32fffa9
modify uts
Feb 4, 2021
34c2785
modify docs
Feb 4, 2021
eb95830
fix checkstyle
Feb 4, 2021
7de0f10
revert misc.xml
Feb 4, 2021
ce5945b
modify uts
Feb 5, 2021
85660b7
reviewed code change
Feb 5, 2021
feb3e1e
reviewed code change
Feb 5, 2021
b6632d6
code reviewed
Feb 23, 2021
688b9c4
Merge branch 'master' into kafka-dynamic-scale-ingest-tasks
Feb 23, 2021
00758e6
code review
Feb 25, 2021
1f10082
log changed
Feb 25, 2021
6334e2b
do StringUtils.encodeForFormat when create allocationExec
Feb 25, 2021
22339dd
code review && limit taskCountMax to partitionNumbers
Mar 2, 2021
644e732
modify docs
Mar 2, 2021
1a9a09d
code review
Mar 5, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions docs/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,115 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|

### Task Autoscaler Properties

> Note that Task AutoScaler is currently designated as experimental.

| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or ignored here will disable `autoScaler` even though `autoScalerConfig` is not null| no (default == false) |
| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, the maximum number of reading tasks would be equal to `{numKafkaPartitions}` and `taskCountMax` would be ignored. | yes |
| `taskCountMin` | Minimum value of task count. When enable autoscaler, the value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be the number of tasks that ingestion starts going up to `taskCountMax`| yes |
| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |

### Lag Based AutoScaler Strategy Related Properties
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |
| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor starts when first check scale logic. | no (default == 300000) |
| `scaleActionPeriodMillis` | The frequency of checking whether to do scale action in millis | no (default == 60000) |
| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |

A sample supervisor spec with `lagBased` autoScaler enabled is shown below:
```json
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [

],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"autoScalerConfig": {
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
"minTriggerScaleActionFrequencyMillis": 600000,
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
"scaleOutThreshold": 6000000,
"triggerScaleOutFractionThreshold": 0.3,
"scaleInThreshold": 1000000,
"triggerScaleInFractionThreshold": 0.9,
"scaleActionStartDelayMillis": 300000,
"scaleActionPeriodMillis": 60000,
"scaleInStep": 1,
"scaleOutStep": 2
},
"taskCount":1,
"replicas":1,
"taskDuration":"PT1H"
},
"tuningConfig":{
"type":"kafka",
"maxRowsPerSegment":5000000
}
}
```

#### More on consumerProperties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
Expand Down Expand Up @@ -282,6 +283,18 @@ public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
// do nothing
}

@Override
public LagStats computeLagStats()
{
throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
}

@Override
public int getActiveTaskGroupsCount()
{
throw new UnsupportedOperationException("Get Active Task Groups Count is not supported in MaterializedViewSupervisor");
}

/**
* Find intervals in which derived dataSource should rebuild the segments.
* Choose the latest intervals to create new HadoopIndexTask and submit it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
Expand All @@ -50,6 +52,7 @@
import org.junit.rules.ExpectedException;

import java.io.IOException;
import java.util.concurrent.Callable;

public class MaterializedViewSupervisorSpecTest
{
Expand Down Expand Up @@ -155,6 +158,85 @@ public void testSupervisorSerialization() throws IOException
Assert.assertEquals(expected.getMetrics(), spec.getMetrics());
}

@Test
public void testMaterializedViewSupervisorSpecCreated()
{
Exception ex = null;

try {
MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
"wikiticker",
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
),
null,
null
),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("added", "added")
},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
false,
objectMapper,
null,
null,
null,
null,
null,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
new NoopChatHandlerProvider(),
new SupervisorStateManagerConfig()
);
Supervisor supervisor = spec.createSupervisor();
Assert.assertTrue(supervisor instanceof MaterializedViewSupervisor);

SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor);
Assert.assertNull(autoscaler);

try {
supervisor.computeLagStats();
}
catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
}

try {
int count = supervisor.getActiveTaskGroupsCount();
}
catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
}

Callable<Integer> noop = new Callable<Integer>() {
@Override
public Integer call()
{
return -1;
}
};

}
catch (Exception e) {
ex = e;
}

Assert.assertNull(ex);
}

@Test
public void testSuspendResuume() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
Expand All @@ -58,6 +59,7 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -330,6 +332,17 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
return false;
}

@Override
public LagStats computeLagStats()
{
Map<Integer, Long> partitionRecordLag = getPartitionRecordLag();
if (partitionRecordLag == null) {
return new LagStats(0, 0, 0);
}

return computeLags(partitionRecordLag);
}

@Override
protected void updatePartitionLagFromStream()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.Period;

import javax.annotation.Nullable;
import java.util.Map;

public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
Expand All @@ -51,6 +53,7 @@ public KafkaSupervisorIOConfig(
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
Comment thread
capistrant marked this conversation as resolved.
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
Expand All @@ -73,6 +76,7 @@ public KafkaSupervisorIOConfig(
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime
);

Expand Down Expand Up @@ -117,6 +121,7 @@ public String toString()
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
", consumerProperties=" + consumerProperties +
", autoScalerConfig=" + getAutoscalerConfig() +
", pollTimeout=" + pollTimeout +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public void testSample()
null,
null,
null,
null,
true,
null,
null,
Expand Down
Loading