Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b35c2b1
Add state and error tracking for seekable stream supervisors
justinborromeo Mar 22, 2019
0f0e950
Fixed nits in docs
justinborromeo Apr 8, 2019
6f44ff7
Made inner class static and updated spec test with jackson inject
justinborromeo Apr 9, 2019
0f13b0e
Merge branch 'master' into 7217-Supervisor-Error-Endpoint-v4
justinborromeo Apr 15, 2019
f8d3532
Review changes
justinborromeo Apr 16, 2019
7ad9858
Remove redundant config param in supervisor
justinborromeo Apr 17, 2019
8632a23
Style
justinborromeo Apr 17, 2019
9e5a20d
Applied some of Jon's recommendations
justinborromeo Apr 19, 2019
46e200c
Add transience field
justinborromeo Apr 23, 2019
68dfcde
write test
dclim May 2, 2019
086ead4
Merge branch 'master' into 7217-Supervisor-Error-Endpoint-v4
dclim May 2, 2019
d986232
implement code review changes except for reconsidering logic of markR…
dclim May 3, 2019
128edad
remove transience reporting and fix SeekableStreamSupervisorStateMana…
dclim May 4, 2019
f686d2a
move call to stateManager.markRunFinished() from RunNotice to runInte…
dclim May 4, 2019
5519d4c
remove stateHistory because it wasn't adding much value, some fixes, …
dclim May 6, 2019
aab44c1
fix tests
dclim May 6, 2019
34b0861
code review changes and add HTTP health check status
dclim May 7, 2019
413af77
fix test failure
dclim May 7, 2019
58d0870
refactor to split into a generic SupervisorStateManager and a specifi…
dclim May 23, 2019
90053cc
Merge remote-tracking branch 'upstream/master' into 7217-Supervisor-E…
dclim May 23, 2019
d13aba1
fixup after merge
dclim May 23, 2019
f40758d
Merge remote-tracking branch 'upstream/master' into 7217-Supervisor-E…
dclim May 23, 2019
02dbec2
code review changes - add additional docs
dclim May 31, 2019
0db0b7e
Merge remote-tracking branch 'upstream/master' into 7217-Supervisor-E…
dclim May 31, 2019
73c4385
Merge remote-tracking branch 'upstream/master' into 7217-Supervisor-E…
dclim May 31, 2019
c515a3a
cleanup KafkaIndexTaskTest
dclim May 31, 2019
17c0184
add additional documentation for Kinesis indexing
dclim May 31, 2019
e7b492d
remove unused throws class
dclim May 31, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
import javax.validation.Validator;
import java.util.Properties;

/**
*/
public class DruidSecondaryModule implements Module
{
private final Properties properties;
Expand Down
93 changes: 93 additions & 0 deletions core/src/test/java/org/apache/druid/utils/CircularBufferTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.utils;

import org.junit.Assert;
import org.junit.Test;


public class CircularBufferTest
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a ChangeRequestHistoryTest test suite that has a test for CircularBuffer, can you move that test here now that you've added a separate test suite?

{
@Test
public void testCircularBufferGetLatest()
{
CircularBuffer<Integer> buff = new CircularBuffer(4);

for (int i = 1; i <= 9; i++) {
buff.add(i); // buffer should contain [9, 6, 7, 8]
}
for (int i = 0; i < 4; i++) {
Assert.assertEquals((Integer) (9 - i), buff.getLatest(i));
}
}

@Test
public void testCircularBufferGet()
{
CircularBuffer<Integer> circularBuffer = new CircularBuffer<>(
3);

circularBuffer.add(1);
Assert.assertEquals(1, circularBuffer.size());
Assert.assertEquals(1, (int) circularBuffer.get(0));

circularBuffer.add(2);
Assert.assertEquals(2, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i + 1, (int) circularBuffer.get(i));
}

circularBuffer.add(3);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i + 1, (int) circularBuffer.get(i));
}

circularBuffer.add(4);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i + 2, (int) circularBuffer.get(i));
}

circularBuffer.add(5);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i + 3, (int) circularBuffer.get(i));
}

circularBuffer.add(6);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i + 4, (int) circularBuffer.get(i));
}

circularBuffer.add(7);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i + 5, (int) circularBuffer.get(i));
}

circularBuffer.add(8);
Assert.assertEquals(3, circularBuffer.size());
for (int i = 0; i < circularBuffer.size(); i++) {
Assert.assertEquals(i + 6, (int) circularBuffer.get(i));
}
}
}
11 changes: 11 additions & 0 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -943,6 +943,17 @@ There are additional configs for autoscaling (if it is enabled):
|`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null|
|`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080|

##### Supervisors

|Property|Description|Default|
|--------|-----------|-------|
|`druid.supervisor.healthinessThreshold`|The number of successful runs before an unhealthy supervisor is again considered healthy.|3|
|`druid.supervisor.unhealthinessThreshold`|The number of failed runs before the supervisor is considered unhealthy.|3|
|`druid.supervisor.taskHealthinessThreshold`|The number of consecutive task successes before an unhealthy supervisor is again considered healthy.|3|
|`druid.supervisor.taskUnhealthinessThreshold`|The number of consecutive task failures before the supervisor is considered unhealthy.|3|
|`druid.supervisor.storeStackTrace`|Whether full stack traces of supervisor exceptions should be stored and returned by the supervisor `/status` endpoint.|false|
|`druid.supervisor.maxStoredExceptionEvents`|The maximum number of exception events that can be returned through the supervisor `/status` endpoint.|`max(healthinessThreshold, unhealthinessThreshold)`|

#### Overlord Dynamic Configuration

The Overlord can dynamically change worker behavior.
Expand Down
49 changes: 49 additions & 0 deletions docs/content/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,61 @@ offsets as reported by Kafka, the consumer lag per partition, as well as the agg
consumer lag per partition may be reported as negative values if the supervisor has not received a recent latest offset
response from Kafka. The aggregate lag value will always be >= 0.

The status report also contains the supervisor's state and a list of recently thrown exceptions (reported as
`recentErrors`, whose max size can be controlled using the `druid.supervisor.maxStoredExceptionEvents` configuration).
There are two fields related to the supervisor's state - `state` and `detailedState`. The `state` field will always be
one of a small number of generic states that are applicable to any type of supervisor, while the `detailedState` field
will contain a more descriptive, implementation-specific state that may provide more insight into the supervisor's
activities than the generic `state` field.

The list of possible `state` values are: [`PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`, `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`]

The list of `detailedState` values and their corresponding `state` mapping is as follows:

|Detailed State|Corresponding State|Description|
|--------------|-------------------|-----------|
|UNHEALTHY_SUPERVISOR|UNHEALTHY_SUPERVISOR|The supervisor has encountered errors on the past `druid.supervisor.unhealthinessThreshold` iterations|
|UNHEALTHY_TASKS|UNHEALTHY_TASKS|The last `druid.supervisor.taskUnhealthinessThreshold` tasks have all failed|
|UNABLE_TO_CONNECT_TO_STREAM|UNHEALTHY_SUPERVISOR|The supervisor is encountering connectivity issues with Kafka and has not successfully connected in the past|
|LOST_CONTACT_WITH_STREAM|UNHEALTHY_SUPERVISOR|The supervisor is encountering connectivity issues with Kafka but has successfully connected in the past|
|PENDING (first iteration only)|PENDING|The supervisor has been initialized and hasn't started connecting to the stream|
|CONNECTING_TO_STREAM (first iteration only)|RUNNING|The supervisor is trying to connect to the stream and update partition data|
|DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is discovering already-running tasks|
|CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating tasks and discovering state|
|RUNNING|RUNNING|The supervisor has started tasks and is waiting for taskDuration to elapse|
|SUSPENDED|SUSPENDED|The supervisor has been suspended|
|STOPPING|STOPPING|The supervisor is stopping|

On each iteration of the supervisor's run loop, the supervisor completes the following tasks in sequence:
1) Fetch the list of partitions from Kafka and determine the starting offset for each partition (either based on the
last processed offset if continuing, or starting from the beginning or ending of the stream if this is a new topic).
2) Discover any running indexing tasks that are writing to the supervisor's datasource and adopt them if they match
the supervisor's configuration, else signal them to stop.
3) Send a status request to each supervised task to update our view of the state of the tasks under our supervision.
4) Handle tasks that have exceeded `taskDuration` and should transition from the reading to publishing state.
5) Handle tasks that have finished publishing and signal redundant replica tasks to stop.
6) Handle tasks that have failed and clean up the supervisor's internal state.
7) Compare the list of healthy tasks to the requested `taskCount` and `replicas` configurations and create additional tasks if required.

The `detailedState` field will show additional values (those marked with "first iteration only") the first time the
supervisor executes this run loop after startup or after resuming from a suspension. This is intended to surface
initialization-type issues, where the supervisor is unable to reach a stable state (perhaps because it can't connect to
Kafka, it can't read from the Kafka topic, or it can't communicate with existing tasks). Once the supervisor is stable -
that is, once it has completed a full execution without encountering any issues - `detailedState` will show a `RUNNING`
state until it is stopped, suspended, or hits a failure threshold and transitions to an unhealthy state.

### Getting Supervisor Ingestion Stats Report

`GET /druid/indexer/v1/supervisor/<supervisorId>/stats` returns a snapshot of the current ingestion row counters for each task being managed by the supervisor, along with moving averages for the row counters.

See [Task Reports: Row Stats](../../ingestion/reports.html#row-stats) for more information.

### Supervisor Health Check

`GET /druid/indexer/v1/supervisor/<supervisorId>/health` returns `200 OK` if the supervisor is healthy and
`503 Service Unavailable` if it is unhealthy. Healthiness is determined by the supervisor's `state` (as returned by the
`/status` endpoint) and the `druid.supervisor.*` Overlord configuration thresholds.

### Updating Existing Supervisors

`POST /druid/indexer/v1/supervisor` can be used to update existing supervisor spec.
Expand Down
56 changes: 51 additions & 5 deletions docs/content/development/extensions-core/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ A sample supervisor spec is shown below:
}
```

## Supervisor Configuration
## Supervisor Spec

|Field|Description|Required|
|--------|-----------|---------|
Expand Down Expand Up @@ -218,12 +218,58 @@ To authenticate with AWS, you must provide your AWS access key and AWS secret ke
```
-Ddruid.kinesis.accessKey=123 -Ddruid.kinesis.secretKey=456
```
The AWS access key ID and secret access key are used for Kinesis API requests. If this is not provided, the service will look for credentials set in environment variables, in the default profile configuration file, and from the EC2 instance profile provider (in this order).
The AWS access key ID and secret access key are used for Kinesis API requests. If this is not provided, the service will
look for credentials set in environment variables, in the default profile configuration file, and from the EC2 instance
profile provider (in this order).

### Getting Supervisor Status Report

`GET /druid/indexer/v1/supervisor/<supervisorId>/status` returns a snapshot report of the current state of the tasks managed by the given supervisor. This includes the latest
sequence numbers as reported by Kinesis. Unlike the Kafka Indexing Service, stats about lag is not yet supported.
`GET /druid/indexer/v1/supervisor/<supervisorId>/status` returns a snapshot report of the current state of the tasks
managed by the given supervisor. This includes the latest sequence numbers as reported by Kinesis. Unlike the Kafka
Indexing Service, stats about lag are not yet supported.

The status report also contains the supervisor's state and a list of recently thrown exceptions (reported as
`recentErrors`, whose max size can be controlled using the `druid.supervisor.maxStoredExceptionEvents` configuration).
There are two fields related to the supervisor's state - `state` and `detailedState`. The `state` field will always be
one of a small number of generic states that are applicable to any type of supervisor, while the `detailedState` field
will contain a more descriptive, implementation-specific state that may provide more insight into the supervisor's
activities than the generic `state` field.

The list of possible `state` values are: [`PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`, `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`]

The list of `detailedState` values and their corresponding `state` mapping is as follows:

|Detailed State|Corresponding State|Description|
|--------------|-------------------|-----------|
|UNHEALTHY_SUPERVISOR|UNHEALTHY_SUPERVISOR|The supervisor has encountered errors on the past `druid.supervisor.unhealthinessThreshold` iterations|
|UNHEALTHY_TASKS|UNHEALTHY_TASKS|The last `druid.supervisor.taskUnhealthinessThreshold` tasks have all failed|
|UNABLE_TO_CONNECT_TO_STREAM|UNHEALTHY_SUPERVISOR|The supervisor is encountering connectivity issues with Kinesis and has not successfully connected in the past|
|LOST_CONTACT_WITH_STREAM|UNHEALTHY_SUPERVISOR|The supervisor is encountering connectivity issues with Kinesis but has successfully connected in the past|
|PENDING (first iteration only)|PENDING|The supervisor has been initialized and hasn't started connecting to the stream|
|CONNECTING_TO_STREAM (first iteration only)|RUNNING|The supervisor is trying to connect to the stream and update partition data|
|DISCOVERING_INITIAL_TASKS (first iteration only)|RUNNING|The supervisor is discovering already-running tasks|
|CREATING_TASKS (first iteration only)|RUNNING|The supervisor is creating tasks and discovering state|
|RUNNING|RUNNING|The supervisor has started tasks and is waiting for taskDuration to elapse|
|SUSPENDED|SUSPENDED|The supervisor has been suspended|
|STOPPING|STOPPING|The supervisor is stopping|

On each iteration of the supervisor's run loop, the supervisor completes the following tasks in sequence:
1) Fetch the list of shards from Kinesis and determine the starting sequence number for each shard (either based on the
last processed sequence number if continuing, or starting from the beginning or ending of the stream if this is a new stream).
2) Discover any running indexing tasks that are writing to the supervisor's datasource and adopt them if they match
the supervisor's configuration, else signal them to stop.
3) Send a status request to each supervised task to update our view of the state of the tasks under our supervision.
4) Handle tasks that have exceeded `taskDuration` and should transition from the reading to publishing state.
5) Handle tasks that have finished publishing and signal redundant replica tasks to stop.
6) Handle tasks that have failed and clean up the supervisor's internal state.
7) Compare the list of healthy tasks to the requested `taskCount` and `replicas` configurations and create additional tasks if required.

The `detailedState` field will show additional values (those marked with "first iteration only") the first time the
supervisor executes this run loop after startup or after resuming from a suspension. This is intended to surface
initialization-type issues, where the supervisor is unable to reach a stable state (perhaps because it can't connect to
Kinesis, it can't read from the stream, or it can't communicate with existing tasks). Once the supervisor is stable -
that is, once it has completed a full execution without encountering any issues - `detailedState` will show a `RUNNING`
state until it is stopped, suspended, or hits a failure threshold and transitions to an unhealthy state.

### Updating Existing Supervisors

Expand Down Expand Up @@ -390,4 +436,4 @@ requires the user to manually provide the Kinesis Client Library on the classpat
compatible with Apache projects.

To enable this feature, add the `amazon-kinesis-client` (tested on version `1.9.2`) jar file ([link](https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-client/1.9.2)) under `dist/druid/extensions/druid-kinesis-indexing-service/`.
Then when submitting a supervisor-spec, set `deaggregate` to true.
Then when submitting a supervisor-spec, set `deaggregate` to true.
Loading