Skip to content
Merged
9 changes: 5 additions & 4 deletions docs/operations/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,8 @@ Returns a list of objects of the currently active supervisors.
|Field|Type|Description|
|---|---|---|
|`id`|String|supervisor unique identifier|
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.|
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details), e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))|
|`healthy`|Boolean|true or false indicator of overall supervisor health|
|`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)|

Expand All @@ -519,9 +519,10 @@ Returns a list of objects of the currently active supervisors and their current
|Field|Type|Description|
|---|---|---|
|`id`|String|supervisor unique identifier|
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.|
|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))|
|`healthy`|Boolean|true or false indicator of overall supervisor health|
|`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state|

* `/druid/indexer/v1/supervisor/<supervisorId>`

Expand Down
21 changes: 21 additions & 0 deletions docs/querying/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,27 @@ For example, to retrieve tasks information filtered by status, use the query
SELECT * FROM sys.tasks WHERE status='FAILED';
```

#### SUPERVISORS table
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the comments on the api-reference page apply here as well.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, fixed here as well


The supervisors table provides information about supervisors.

|Column|Type|Notes|
|------|-----|-----|
|supervisor_id|STRING|Supervisor task identifier|
|state|STRING|Basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.|
|detailed_state|STRING|Supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))|
|healthy|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates a healthy supervisor|
|type|STRING|Type of supervisor, e.g. `kafka`, `kinesis` or `materialized_view`|
|source|STRING|Source of the supervisor, e.g. Kafka topic or Kinesis stream|
|suspended|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates supervisor is in suspended state|
|spec|STRING|JSON-serialized supervisor spec|

For example, to retrieve supervisor tasks information filtered by health status, use the query

```sql
SELECT * FROM sys.supervisors WHERE healthy=0;
```

Note that sys tables may not support all the Druid SQL Functions.

## Server configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
public class MaterializedViewSupervisorSpec implements SupervisorSpec
{
private static final String TASK_PREFIX = "index_materialized_view";
private static final String SUPERVISOR_TYPE = "materialized_view";
private final String baseDataSource;
private final DimensionsSpec dimensionsSpec;
private final AggregatorFactory[] aggregators;
Expand Down Expand Up @@ -325,6 +326,20 @@ public boolean isSuspended()
return suspended;
}

@Override
@JsonProperty("type")
public String getType()
{
return SUPERVISOR_TYPE;
}

@Override
@JsonProperty("source")
public String getSource()
{
return getBaseDataSource();
}

@Override
public String getId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec
{
private static final String TASK_TYPE = "kafka";

@JsonCreator
public KafkaSupervisorSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
Expand Down Expand Up @@ -103,6 +105,18 @@ public KafkaSupervisorSpec(
);
}

@Override
public String getType()
{
return TASK_TYPE;
}

@Override
public String getSource()
{
return getIoConfig() != null ? getIoConfig().getTopic() : null;
}

@Override
public Supervisor createSupervisor()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec
{
private static final String SUPERVISOR_TYPE = "kinesis";
private final AWSCredentialsConfig awsCredentialsConfig;

@JsonCreator
Expand Down Expand Up @@ -132,6 +133,18 @@ public Supervisor createSupervisor()
);
}

@Override
public String getType()
{
return SUPERVISOR_TYPE;
}

@Override
public String getSource()
{
return getIoConfig() != null ? getIoConfig().getStream() : null;
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.indexing.overlord.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -79,12 +81,14 @@ public class SupervisorResource

private final TaskMaster taskMaster;
private final AuthorizerMapper authorizerMapper;
private final ObjectMapper objectMapper;

@Inject
public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapper)
public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapper, ObjectMapper objectMapper)
{
this.taskMaster = taskMaster;
this.authorizerMapper = authorizerMapper;
this.objectMapper = objectMapper;
}

@POST
Expand Down Expand Up @@ -120,6 +124,7 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe
public Response specGetAll(
@QueryParam("full") String full,
@QueryParam("state") Boolean state,
@QueryParam("system") String system,
@Context final HttpServletRequest req
)
{
Expand All @@ -132,24 +137,44 @@ public Response specGetAll(
);
final boolean includeFull = full != null;
final boolean includeState = state != null && state;
final boolean includeSystem = system != null;

if (includeFull || includeState) {
List<Map<String, ?>> allStates = authorizedSupervisorIds
if (includeFull || includeState || includeSystem) {
List<SupervisorStatus> allStates = authorizedSupervisorIds
.stream()
.map(x -> {
Optional<SupervisorStateManager.State> theState =
manager.getSupervisorState(x);
ImmutableMap.Builder<String, Object> theBuilder = ImmutableMap.builder();
theBuilder.put("id", x);
SupervisorStatus.Builder theBuilder = new SupervisorStatus.Builder();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice change to have it use a builder!

theBuilder.withId(x);
if (theState.isPresent()) {
theBuilder.put("state", theState.get().getBasicState());
theBuilder.put("detailedState", theState.get());
theBuilder.put("healthy", theState.get().isHealthy());
theBuilder.withState(theState.get().getBasicState().toString())
.withDetailedState(theState.get().toString())
.withHealthy(theState.get().isHealthy());
}
if (includeFull) {
Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
if (theSpec.isPresent()) {
theBuilder.put("spec", theSpec.get());
theBuilder.withSpec(manager.getSupervisorSpec(x).get());
}
}
if (includeSystem) {
Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
if (theSpec.isPresent()) {
try {
// serializing SupervisorSpec here, so that callers of `druid/indexer/v1/supervisor?system`
// which are outside the overlord process can deserialize the response and get a json
// payload of SupervisorSpec object when they don't have guice bindings for all the fields
// for example, broker does not have bindings for all fields of `KafkaSupervisorSpec` or
// `KinesisSupervisorSpec`
theBuilder.withSpecString(objectMapper.writeValueAsString(manager.getSupervisorSpec(x).get()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be nice to have to builder accept SupervisorSpec and serialize/deserialize it to a string internally so that the formatting logic is all in one place.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i tried that, but the objectMapper is injected here, which is used to serialize the SupervisorSpec

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it work if you add another constructor for SupervisorStatus.Builder that injects the ObjectMapper?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, it seems odd to add ObjectMapper to Builder, and the builder already accepts a spec, in addition to specString. I could generate specString from spec in SupervisorStatus#getSpecString if I have the right ObjectMapper, but then I think the specString would appear in response to druid/indexer/v1/supervisor?full as well as druid/indexer/v1/supervisor?fullStatus, since it's not null anymore for the former call and it's not required/desired in former call.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, since the SupervisorStatus DTO already has a SupervisorSpec field that gets serialized when the response is serialized, why do you need to have another field that's an explicitly serialized version (which is populated before the response is serialized)?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, i added already serialized specString to get the json payload in SystemSchema#SupervisorsTable via the JsonParserIterator, otherwise the deserialization errors out since KafkaSupervisorSpec's attributes are not present in broker, they are only available in overlord.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccaominh added comment and javadoc about the need for explicitly serialized spec, let me know if it helps clarify things

}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
theBuilder.withType(manager.getSupervisorSpec(x).get().getType())
.withSource(manager.getSupervisorSpec(x).get().getSource())
.withSuspended(manager.getSupervisorSpec(x).get().isSuspended());
}
}
return theBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ public boolean isSuspended()
{
return false;
}

@Override
public String getType()
{
return null;
}

@Override
public String getSource()
{
return null;
}
};
EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString()))
.andReturn(Optional.of(supervisorSpec))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,18 @@ public boolean isSuspended()
return suspended;
}

@Override
public String getType()
{
return null;
}

@Override
public String getSource()
{
return null;
}

@Override
public List<String> getDataSources()
{
Expand Down
Loading