Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions docs/content/operations/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,22 @@ Returns a list of objects of the currently active supervisors.
|Field|Type|Description|
|---|---|---|
|`id`|String|supervisor unique identifier|
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
|`healthy`|Boolean|true or false indicator of overall supervisor health|
|`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)|

* `/druid/indexer/v1/supervisor?state=true`

Returns a list of objects of the currently active supervisors and their current state.

|Field|Type|Description|
|---|---|---|
|`id`|String|supervisor unique identifier|
|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`|
|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)|
|`healthy`|Boolean|true or false indicator of overall supervisor health|

* `/druid/indexer/v1/supervisor/<supervisorId>`

Returns the current spec for the supervisor with the provided ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ public SupervisorReport getStatus()
);
}

@Override
public SupervisorStateManager.State getState()
{
return stateManager.getSupervisorState();
}

@Override
public Boolean isHealthy()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,4 @@ public KafkaSupervisorIOConfig getIoConfig()
{
return spec.getIoConfig();
}

@Override
public Boolean isHealthy()
{
return stateManager.isHealthy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,4 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
return true;
}

@Override
public Boolean isHealthy()
{
return stateManager.isHealthy();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public Optional<SupervisorSpec> getSupervisorSpec(String id)
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.rhs);
}

public Optional<SupervisorStateManager.State> getSupervisorState(String id)
{
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getState());
}

public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
{
Preconditions.checkState(started, "SupervisorManager not started");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -118,6 +119,7 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe
@Produces(MediaType.APPLICATION_JSON)
public Response specGetAll(
@QueryParam("full") String full,
@QueryParam("state") Boolean state,
@Context final HttpServletRequest req
)
{
Expand All @@ -128,20 +130,36 @@ public Response specGetAll(
manager,
manager.getSupervisorIds()
);

if (full == null) {
return Response.ok(authorizedSupervisorIds).build();
} else {
List<Map<String, ?>> all =
authorizedSupervisorIds.stream()
.map(x -> ImmutableMap.<String, Object>builder()
.put("id", x)
.put("spec", manager.getSupervisorSpec(x).get())
.build()
)
.collect(Collectors.toList());
return Response.ok(all).build();
final boolean includeFull = full != null;
final boolean includeState = state != null && state;

if (includeFull || includeState) {
List<Map<String, ?>> allStates = authorizedSupervisorIds
.stream()
.map(x -> {
Optional<SupervisorStateManager.State> theState =
manager.getSupervisorState(x);
ImmutableMap.Builder<String, Object> theBuilder = ImmutableMap.builder();
theBuilder.put("id", x);
if (theState.isPresent()) {
theBuilder.put("state", theState.get().getBasicState());
theBuilder.put("detailedState", theState.get());
theBuilder.put("healthy", theState.get().isHealthy());
}
if (includeFull) {
Optional<SupervisorSpec> theSpec = manager.getSupervisorSpec(x);
if (theSpec.isPresent()) {
theBuilder.put("spec", theSpec.get());
}
}
return theBuilder.build();
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
return Response.ok(allStates).build();
}

return Response.ok(authorizedSupervisorIds).build();
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,19 @@ public SupervisorReport getStatus()
return generateReport(true);
}


@Override
public SupervisorStateManager.State getState()
{
return stateManager.getSupervisorState();
}

@Override
public Boolean isHealthy()
{
return stateManager.isHealthy();
}

private SupervisorReport<? extends SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType>> generateReport(
boolean includeOffsets
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public List<String> getDataSources()
EasyMock.expectLastCall().anyTimes();
replayAll();

Response response = supervisorResource.specGetAll(null, request);
Response response = supervisorResource.specGetAll(null, null, request);
verifyAll();

Assert.assertEquals(200, response.getStatus());
Expand All @@ -176,7 +176,7 @@ public List<String> getDataSources()
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();

response = supervisorResource.specGetAll(null, request);
response = supervisorResource.specGetAll(null, null, request);
verifyAll();

Assert.assertEquals(503, response.getStatus());
Expand Down Expand Up @@ -205,11 +205,15 @@ public List<String> getDataSources()
return Collections.singletonList("datasource2");
}
};
SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING;
SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED;

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(2);
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(2);
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
Expand All @@ -219,7 +223,7 @@ public List<String> getDataSources()
EasyMock.expectLastCall().anyTimes();
replayAll();

Response response = supervisorResource.specGetAll("", request);
Response response = supervisorResource.specGetAll("", null, request);
verifyAll();

Assert.assertEquals(200, response.getStatus());
Expand All @@ -233,6 +237,70 @@ public List<String> getDataSources()
);
}

@Test
public void testSpecGetState()
{
Set<String> supervisorIds = ImmutableSet.of("id1", "id2");
SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null)
{

@Override
public List<String> getDataSources()
{
return Collections.singletonList("datasource1");
}
};
SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null)
{

@Override
public List<String> getDataSources()
{
return Collections.singletonList("datasource2");
}
};

SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING;
SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED;

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce();
EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(1);
EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(1);
EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1);
EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1);
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(
new AuthenticationResult("druid", "druid", null, null)
).atLeastOnce();
request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
EasyMock.expectLastCall().anyTimes();
replayAll();

Response response = supervisorResource.specGetAll(null, true, request);
verifyAll();

Assert.assertEquals(200, response.getStatus());
List<Map<String, Object>> states = (List<Map<String, Object>>) response.getEntity();
Assert.assertTrue(
states.stream()
.allMatch(state -> {
final String id = (String) state.get("id");
if ("id1".equals(id)) {
return state1.equals(state.get("state"))
&& state1.equals(state.get("detailedState"))
&& (Boolean) state.get("healthy") == state1.isHealthy();
} else if ("id2".equals(id)) {
return state2.equals(state.get("state"))
&& state2.equals(state.get("detailedState"))
&& (Boolean) state.get("healthy") == state2.isHealthy();
}
return false;
})
);
}

@Test
public void testSpecGet()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ public SupervisorReport getStatus()
return null;
}

@Override
public SupervisorStateManager.State getState()
{
return SupervisorStateManager.BasicState.RUNNING;
}

@Override
public void reset(DataSourceMetadata dataSourceMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public interface Supervisor

SupervisorReport getStatus();

SupervisorStateManager.State getState();

default Map<String, Map<String, Object>> getStats()
{
return ImmutableMap.of();
Expand Down