diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md index 9326f2b3103b..473282f19edc 100644 --- a/docs/content/operations/api-reference.md +++ b/docs/content/operations/api-reference.md @@ -510,8 +510,22 @@ Returns a list of objects of the currently active supervisors. |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| +|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| +|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| +|`healthy`|Boolean|true or false indicator of overall supervisor health| |`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)| +* `/druid/indexer/v1/supervisor?state=true` + +Returns a list of objects of the currently active supervisors and their current state. + +|Field|Type|Description| +|---|---|---| +|`id`|String|supervisor unique identifier| +|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| +|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| +|`healthy`|Boolean|true or false indicator of overall supervisor health| + * `/druid/indexer/v1/supervisor/` Returns the current spec for the supervisor with the provided ID. diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index c1033ae647d4..378f1dafa143 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -240,6 +240,12 @@ public SupervisorReport getStatus() ); } + @Override + public SupervisorStateManager.State getState() + { + return stateManager.getSupervisorState(); + } + @Override public Boolean isHealthy() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 5d419a4497f3..cdf133677bff 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -385,10 +385,4 @@ public KafkaSupervisorIOConfig getIoConfig() { return spec.getIoConfig(); } - - @Override - public Boolean isHealthy() - { - return stateManager.isHealthy(); - } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 39619a268630..5a0c8614b805 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -316,10 +316,4 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { return true; } - - @Override - public Boolean isHealthy() - { - return stateManager.isHealthy(); - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 56112d15732d..5727c4ed70bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -65,6 +65,12 @@ public Optional getSupervisorSpec(String id) return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.rhs); } + public Optional getSupervisorState(String id) + { + Pair supervisor = supervisors.get(id); + return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getState()); + } + public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) { Preconditions.checkState(started, "SupervisorManager not started"); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 9d97a80aca7b..e7e9daf74184 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -52,6 +52,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -118,6 +119,7 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe @Produces(MediaType.APPLICATION_JSON) public Response specGetAll( @QueryParam("full") String full, + @QueryParam("state") Boolean state, @Context final HttpServletRequest req ) { @@ -128,20 +130,36 @@ public Response specGetAll( manager, manager.getSupervisorIds() ); - - if (full == null) { - return Response.ok(authorizedSupervisorIds).build(); - } else { - List> all = - authorizedSupervisorIds.stream() - .map(x -> ImmutableMap.builder() - .put("id", x) - .put("spec", manager.getSupervisorSpec(x).get()) - .build() - ) - .collect(Collectors.toList()); - return Response.ok(all).build(); + final boolean includeFull = full != null; + final boolean includeState = state != null && state; + + if (includeFull || includeState) { + List> allStates = authorizedSupervisorIds + .stream() + .map(x -> { + Optional theState = + manager.getSupervisorState(x); + ImmutableMap.Builder theBuilder = ImmutableMap.builder(); + theBuilder.put("id", x); + if (theState.isPresent()) { + theBuilder.put("state", theState.get().getBasicState()); + theBuilder.put("detailedState", theState.get()); + theBuilder.put("healthy", theState.get().isHealthy()); + } + if (includeFull) { + Optional theSpec = manager.getSupervisorSpec(x); + if (theSpec.isPresent()) { + theBuilder.put("spec", theSpec.get()); + } + } + return theBuilder.build(); + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + return Response.ok(allStates).build(); } + + return Response.ok(authorizedSupervisorIds).build(); } ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 5e7c693aa0c6..64112d6c5fde 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -807,6 +807,19 @@ public SupervisorReport getStatus() return generateReport(true); } + + @Override + public SupervisorStateManager.State getState() + { + return stateManager.getSupervisorState(); + } + + @Override + public Boolean isHealthy() + { + return stateManager.isHealthy(); + } + private SupervisorReport> generateReport( boolean includeOffsets ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 96afde5ed1d5..f6cb0080a966 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -166,7 +166,7 @@ public List getDataSources() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll(null, request); + Response response = supervisorResource.specGetAll(null, null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); @@ -176,7 +176,7 @@ public List getDataSources() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.specGetAll(null, request); + response = supervisorResource.specGetAll(null, null, request); verifyAll(); Assert.assertEquals(503, response.getStatus()); @@ -205,11 +205,15 @@ public List getDataSources() return Collections.singletonList("datasource2"); } }; + SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING; + SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED; EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce(); EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(2); EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(2); + EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1); + EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( @@ -219,7 +223,7 @@ public List getDataSources() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll("", request); + Response response = supervisorResource.specGetAll("", null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); @@ -233,6 +237,70 @@ public List getDataSources() ); } + @Test + public void testSpecGetState() + { + Set supervisorIds = ImmutableSet.of("id1", "id2"); + SupervisorSpec spec1 = new TestSupervisorSpec("id1", null, null) + { + + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + SupervisorSpec spec2 = new TestSupervisorSpec("id2", null, null) + { + + @Override + public List getDataSources() + { + return Collections.singletonList("datasource2"); + } + }; + + SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING; + SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(supervisorIds).atLeastOnce(); + EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(spec1)).times(1); + EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(spec2)).times(1); + EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1); + EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( + new AuthenticationResult("druid", "druid", null, null) + ).atLeastOnce(); + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + Response response = supervisorResource.specGetAll(null, true, request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + List> states = (List>) response.getEntity(); + Assert.assertTrue( + states.stream() + .allMatch(state -> { + final String id = (String) state.get("id"); + if ("id1".equals(id)) { + return state1.equals(state.get("state")) + && state1.equals(state.get("detailedState")) + && (Boolean) state.get("healthy") == state1.isHealthy(); + } else if ("id2".equals(id)) { + return state2.equals(state.get("state")) + && state2.equals(state.get("detailedState")) + && (Boolean) state.get("healthy") == state2.isHealthy(); + } + return false; + }) + ); + } + @Test public void testSpecGet() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index d8b726602525..3a904b9e0f74 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -113,6 +113,12 @@ public SupervisorReport getStatus() return null; } + @Override + public SupervisorStateManager.State getState() + { + return SupervisorStateManager.BasicState.RUNNING; + } + @Override public void reset(DataSourceMetadata dataSourceMetadata) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index cf3f4d5fa2e2..c0ecf44c29c0 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -39,6 +39,8 @@ public interface Supervisor SupervisorReport getStatus(); + SupervisorStateManager.State getState(); + default Map> getStats() { return ImmutableMap.of();