diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index a3cf416bdcfb..fe6813d7c49b 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -507,8 +507,8 @@ Returns a list of objects of the currently active supervisors. |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| -|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| +|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.| +|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details), e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))| |`healthy`|Boolean|true or false indicator of overall supervisor health| |`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)| @@ -519,9 +519,10 @@ Returns a list of objects of the currently active supervisors and their current |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| -|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| +|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.| +|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))| |`healthy`|Boolean|true or false indicator of overall supervisor health| +|`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| * `/druid/indexer/v1/supervisor/` diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 71c0b0205d1c..61a38b290c86 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -771,6 +771,27 @@ For example, to retrieve tasks information filtered by status, use the query SELECT * FROM sys.tasks WHERE status='FAILED'; ``` +#### SUPERVISORS table + +The supervisors table provides information about supervisors. + +|Column|Type|Notes| +|------|-----|-----| +|supervisor_id|STRING|Supervisor task identifier| +|state|STRING|Basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.| +|detailed_state|STRING|Supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))| +|healthy|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates a healthy supervisor| +|type|STRING|Type of supervisor, e.g. `kafka`, `kinesis` or `materialized_view`| +|source|STRING|Source of the supervisor, e.g. Kafka topic or Kinesis stream| +|suspended|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates supervisor is in suspended state| +|spec|STRING|JSON-serialized supervisor spec| + +For example, to retrieve supervisor tasks information filtered by health status, use the query + +```sql +SELECT * FROM sys.supervisors WHERE healthy=0; +``` + Note that sys tables may not support all the Druid SQL Functions. ## Server configuration diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 874ae6106513..0e25841fe3c6 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -62,6 +62,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec { private static final String TASK_PREFIX = "index_materialized_view"; + private static final String SUPERVISOR_TYPE = "materialized_view"; private final String baseDataSource; private final DimensionsSpec dimensionsSpec; private final AggregatorFactory[] aggregators; @@ -325,6 +326,20 @@ public boolean isSuspended() return suspended; } + @Override + @JsonProperty("type") + public String getType() + { + return SUPERVISOR_TYPE; + } + + @Override + @JsonProperty("source") + public String getSource() + { + return getBaseDataSource(); + } + @Override public String getId() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 60c187f07487..772a26ad07c3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -40,6 +40,8 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec { + private static final String TASK_TYPE = "kafka"; + @JsonCreator public KafkaSupervisorSpec( @JsonProperty("dataSchema") DataSchema dataSchema, @@ -103,6 +105,18 @@ public KafkaSupervisorSpec( ); } + @Override + public String getType() + { + return TASK_TYPE; + } + + @Override + public String getSource() + { + return getIoConfig() != null ? getIoConfig().getTopic() : null; + } + @Override public Supervisor createSupervisor() { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index ef0679805708..fb5751b84c43 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -42,6 +42,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec { + private static final String SUPERVISOR_TYPE = "kinesis"; private final AWSCredentialsConfig awsCredentialsConfig; @JsonCreator @@ -132,6 +133,18 @@ public Supervisor createSupervisor() ); } + @Override + public String getType() + { + return SUPERVISOR_TYPE; + } + + @Override + public String getSource() + { + return getIoConfig() != null ? getIoConfig().getStream() : null; + } + @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 1762903cab17..74c5b6192cb8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -19,6 +19,8 @@ package org.apache.druid.indexing.overlord.supervisor; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -79,12 +81,14 @@ public class SupervisorResource private final TaskMaster taskMaster; private final AuthorizerMapper authorizerMapper; + private final ObjectMapper objectMapper; @Inject - public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapper) + public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapper, ObjectMapper objectMapper) { this.taskMaster = taskMaster; this.authorizerMapper = authorizerMapper; + this.objectMapper = objectMapper; } @POST @@ -120,6 +124,7 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe public Response specGetAll( @QueryParam("full") String full, @QueryParam("state") Boolean state, + @QueryParam("system") String system, @Context final HttpServletRequest req ) { @@ -132,24 +137,44 @@ public Response specGetAll( ); final boolean includeFull = full != null; final boolean includeState = state != null && state; + final boolean includeSystem = system != null; - if (includeFull || includeState) { - List> allStates = authorizedSupervisorIds + if (includeFull || includeState || includeSystem) { + List allStates = authorizedSupervisorIds .stream() .map(x -> { Optional theState = manager.getSupervisorState(x); - ImmutableMap.Builder theBuilder = ImmutableMap.builder(); - theBuilder.put("id", x); + SupervisorStatus.Builder theBuilder = new SupervisorStatus.Builder(); + theBuilder.withId(x); if (theState.isPresent()) { - theBuilder.put("state", theState.get().getBasicState()); - theBuilder.put("detailedState", theState.get()); - theBuilder.put("healthy", theState.get().isHealthy()); + theBuilder.withState(theState.get().getBasicState().toString()) + .withDetailedState(theState.get().toString()) + .withHealthy(theState.get().isHealthy()); } if (includeFull) { Optional theSpec = manager.getSupervisorSpec(x); if (theSpec.isPresent()) { - theBuilder.put("spec", theSpec.get()); + theBuilder.withSpec(manager.getSupervisorSpec(x).get()); + } + } + if (includeSystem) { + Optional theSpec = manager.getSupervisorSpec(x); + if (theSpec.isPresent()) { + try { + // serializing SupervisorSpec here, so that callers of `druid/indexer/v1/supervisor?system` + // which are outside the overlord process can deserialize the response and get a json + // payload of SupervisorSpec object when they don't have guice bindings for all the fields + // for example, broker does not have bindings for all fields of `KafkaSupervisorSpec` or + // `KinesisSupervisorSpec` + theBuilder.withSpecString(objectMapper.writeValueAsString(manager.getSupervisorSpec(x).get())); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + theBuilder.withType(manager.getSupervisorSpec(x).get().getType()) + .withSource(manager.getSupervisorSpec(x).get().getSource()) + .withSuspended(manager.getSupervisorSpec(x).get().isSuspended()); } } return theBuilder.build(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java index 7977164b1ca6..e603a553e244 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java @@ -149,6 +149,18 @@ public boolean isSuspended() { return false; } + + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } }; EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString())) .andReturn(Optional.of(supervisorSpec)) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 6f62947d5567..6b93f84d1625 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -410,6 +410,18 @@ public boolean isSuspended() return suspended; } + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } + @Override public List getDataSources() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index ef477fe209a5..1390e7acc98c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.TestHelper; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthenticationResult; @@ -45,6 +46,7 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -53,6 +55,7 @@ @RunWith(EasyMockRunner.class) public class SupervisorResourceTest extends EasyMockSupport { + private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); private static final TestSupervisorSpec SPEC1 = new TestSupervisorSpec( "id1", null, @@ -100,7 +103,8 @@ public Authorizer getAuthorizer(String name) } }; } - } + }, + OBJECT_MAPPER ); } @@ -160,7 +164,7 @@ public void testSpecGetAll() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll(null, null, request); + Response response = supervisorResource.specGetAll(null, null, null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); @@ -170,7 +174,7 @@ public void testSpecGetAll() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.specGetAll(null, null, request); + response = supervisorResource.specGetAll(null, null, null, request); verifyAll(); Assert.assertEquals(503, response.getStatus()); @@ -184,10 +188,10 @@ public void testSpecGetAllFull() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce(); - EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(2); - EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(2); - EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1); - EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1); + EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).anyTimes(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( @@ -197,20 +201,59 @@ public void testSpecGetAllFull() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll("", null, request); + Response response = supervisorResource.specGetAll("", null, null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); - List> specs = (List>) response.getEntity(); + List specs = (List) response.getEntity(); Assert.assertTrue( specs.stream() .allMatch(spec -> - ("id1".equals(spec.get("id")) && SPEC1.equals(spec.get("spec"))) || - ("id2".equals(spec.get("id")) && SPEC2.equals(spec.get("spec"))) + ("id1".equals(spec.getId()) && SPEC1.equals(spec.getSpec())) || + ("id2".equals(spec.getId()) && SPEC2.equals(spec.getSpec())) ) ); } + @Test + public void testSpecGetAllSystem() + { + SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING; + SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce(); + EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).anyTimes(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( + new AuthenticationResult("druid", "druid", null, null) + ).atLeastOnce(); + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + Response response = supervisorResource.specGetAll(null, null, "", request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + List specs = (List) response.getEntity(); + specs.sort(Comparator.comparing(SupervisorStatus::getId)); + Assert.assertEquals(2, specs.size()); + SupervisorStatus spec = specs.get(0); + Assert.assertEquals("id1", spec.getId()); + Assert.assertEquals("RUNNING", spec.getState()); + Assert.assertEquals("RUNNING", spec.getDetailedState()); + Assert.assertEquals(true, spec.isHealthy()); + Assert.assertEquals("{\"type\":\"SupervisorResourceTest$TestSupervisorSpec\"}", spec.getSpecString()); + Assert.assertEquals("test", spec.getType()); + Assert.assertEquals("dummy", spec.getSource()); + Assert.assertEquals(false, spec.isSuspended()); + } + @Test public void testSpecGetState() { @@ -232,23 +275,23 @@ public void testSpecGetState() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll(null, true, request); + Response response = supervisorResource.specGetAll(null, true, null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); - List> states = (List>) response.getEntity(); + List states = (List) response.getEntity(); Assert.assertTrue( states.stream() .allMatch(state -> { - final String id = (String) state.get("id"); + final String id = (String) state.getId(); if ("id1".equals(id)) { - return state1.equals(state.get("state")) - && state1.equals(state.get("detailedState")) - && (Boolean) state.get("healthy") == state1.isHealthy(); + return state1.toString().equals(state.getState()) + && state1.toString().equals(state.getDetailedState()) + && (Boolean) state.isHealthy() == state1.isHealthy(); } else if ("id2".equals(id)) { - return state2.equals(state.get("state")) - && state2.equals(state.get("detailedState")) - && (Boolean) state.get("healthy") == state2.isHealthy(); + return state2.toString().equals(state.getState()) + && state2.toString().equals(state.getDetailedState()) + && (Boolean) state.isHealthy() == state2.isHealthy(); } return false; }) @@ -1137,6 +1180,18 @@ public boolean isSuspended() return suspended; } + @Override + public String getType() + { + return "test"; + } + + @Override + public String getSource() + { + return "dummy"; + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 1d1a61f58f79..a3c10889f4a2 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -48,25 +48,36 @@ public class NoopSupervisorSpec implements SupervisorSpec @JsonProperty("suspended") private boolean suspended; //ignored + @JsonProperty("type") + private String type; //ignored + + @JsonProperty("source") + private String source; //ignored + @VisibleForTesting public NoopSupervisorSpec( String id, List datasources ) { - this(id, datasources, null); + this(id, datasources, null, null, null); } @JsonCreator public NoopSupervisorSpec( @JsonProperty("id") @Nullable String id, @JsonProperty("dataSources") @Nullable List datasources, - @JsonProperty("suspended") @Nullable Boolean suspended + @JsonProperty("suspended") @Nullable Boolean suspended, + @JsonProperty("type") @Nullable String type, + @JsonProperty("source") @Nullable String source ) { this.id = id; this.datasources = datasources == null ? new ArrayList<>() : datasources; - this.suspended = false; // ignore + // these are ignored + this.suspended = false; + this.type = "noop"; + this.source = "noop"; } @Override @@ -92,6 +103,20 @@ public boolean isSuspended() return suspended; } + @Override + @JsonProperty("type") + public String getType() + { + return type; + } + + @Override + @JsonProperty("source") + public String getSource() + { + return source; + } + @Override public Supervisor createSupervisor() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index e875dfc39e95..041156ce4251 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -56,4 +56,20 @@ default boolean isSuspended() { return false; } + + /** + * This API is only used for informational purposes in + * org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable + * + * @return supervisor type + */ + String getType(); + + /** + * This API is only used for informational purposes in + * org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable + * + * @return source like stream or topic name + */ + String getSource(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java new file mode 100644 index 000000000000..e87c8bbf31fe --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; +import com.google.common.base.Preconditions; + +import java.util.Objects; + +/** + * This class contains the attributes of a supervisor which are returned by the API's in + * org.apache.druid.indexing.overlord.supervisor.SupervisorResource + * and used by org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable + */ +@JsonDeserialize(builder = SupervisorStatus.Builder.class) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SupervisorStatus +{ + private final String id; + private final String state; + private final String detailedState; + private final boolean healthy; + private final SupervisorSpec spec; + /** + * This is a JSON representation of {@code spec} + * The explicit serialization is present here so that users of {@code SupervisorStatus} which cannot + * deserialize {@link SupervisorSpec} can use this attribute instead + */ + private final String specString; + private final String type; + private final String source; + private final boolean suspended; + + private SupervisorStatus( + Builder builder + ) + { + this.id = Preconditions.checkNotNull(builder.id, "id"); + this.state = builder.state; + this.detailedState = builder.detailedState; + this.healthy = builder.healthy; + this.spec = builder.spec; + this.specString = builder.specString; + this.type = builder.type; + this.source = builder.source; + this.suspended = builder.suspended; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SupervisorStatus that = (SupervisorStatus) o; + return healthy == that.healthy && + Objects.equals(id, that.id) && + Objects.equals(state, that.state) && + Objects.equals(detailedState, that.detailedState) && + Objects.equals(spec, that.spec) && + Objects.equals(specString, that.specString) && + Objects.equals(type, that.type) && + Objects.equals(source, that.source) && + suspended == that.suspended; + } + + @Override + public int hashCode() + { + return Objects.hash(id, state, detailedState, healthy, spec, specString, type, source, suspended); + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + public String getState() + { + return state; + } + + @JsonProperty + public String getDetailedState() + { + return detailedState; + } + + @JsonProperty + public boolean isHealthy() + { + return healthy; + } + + @JsonProperty + public SupervisorSpec getSpec() + { + return spec; + } + + @JsonProperty + public String getSpecString() + { + return specString; + } + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public String getSource() + { + return source; + } + + @JsonProperty + public boolean isSuspended() + { + return suspended; + } + + @JsonPOJOBuilder + public static class Builder + { + @JsonProperty("id") + private String id; + @JsonProperty("state") + private String state; + @JsonProperty("detailedState") + private String detailedState; + @JsonProperty("healthy") + private boolean healthy; + @JsonProperty("spec") + private SupervisorSpec spec; + @JsonProperty("specString") + private String specString; + @JsonProperty("type") + private String type; + @JsonProperty("source") + private String source; + @JsonProperty("suspended") + private boolean suspended; + + @JsonProperty + public Builder withId(String id) + { + this.id = Preconditions.checkNotNull(id, "id"); + return this; + } + + @JsonProperty + public Builder withState(String state) + { + this.state = state; + return this; + } + + @JsonProperty + public Builder withDetailedState(String detailedState) + { + this.detailedState = detailedState; + return this; + } + + @JsonProperty + public Builder withHealthy(boolean healthy) + { + this.healthy = healthy; + return this; + } + + @JsonProperty + public Builder withSpec(SupervisorSpec spec) + { + this.spec = spec; + return this; + } + + @JsonProperty + public Builder withSpecString(String spec) + { + this.specString = spec; + return this; + } + + @JsonProperty + public Builder withType(String type) + { + this.type = type; + return this; + } + + @JsonProperty + public Builder withSource(String source) + { + this.source = source; + return this; + } + + @JsonProperty + public Builder withSuspended(boolean suspended) + { + this.suspended = suspended; + return this; + } + + public SupervisorStatus build() + { + return new SupervisorStatus(this); + } + } +} + diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java new file mode 100644 index 000000000000..4c370453a0bc --- /dev/null +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class SupervisorStatusTest +{ + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final SupervisorStatus.Builder builder = new SupervisorStatus.Builder(); + final SupervisorStatus supervisorStatus = builder.withId("wikipedia") + .withState("RUNNING") + .withDetailedState("RUNNING") + .withHealthy(true) + .withType("kafka") + .withSource("wikipedia") + .withSuspended(false) + .build(); + final String serialized = mapper.writeValueAsString(supervisorStatus); + final SupervisorStatus deserialized = mapper.readValue(serialized, SupervisorStatus.class); + Assert.assertEquals(supervisorStatus, deserialized); + } + + @Test + public void testJsonAttr() throws IOException + { + String json = "{" + + "\"id\":\"wikipedia\"," + + "\"state\":\"UNHEALTHY_SUPERVISOR\"," + + "\"detailedState\":\"UNHEALTHY_SUPERVISOR\"," + + "\"healthy\":false," + + "\"type\":\"kafka\"," + + "\"source\":\"wikipedia\"," + + "\"suspended\":false" + + "}"; + final ObjectMapper mapper = new ObjectMapper(); + final SupervisorStatus deserialized = mapper.readValue(json, SupervisorStatus.class); + Assert.assertNotNull(deserialized); + Assert.assertEquals("wikipedia", deserialized.getId()); + final String serialized = mapper.writeValueAsString(deserialized); + Assert.assertTrue(serialized.contains("\"source\"")); + Assert.assertEquals(json, serialized); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index 38e65de4fd20..93322209c8a2 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -190,5 +190,18 @@ public List getDataSources() { return Collections.singletonList(dataSource); } + + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } + } } diff --git a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java index 19802f172ddd..e935a413f045 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java +++ b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java @@ -58,6 +58,18 @@ public List getDataSources() return null; } + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } + @JsonProperty public Object getData() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 08f768da6de2..8ee7b7e0a898 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -55,6 +55,7 @@ import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeType; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; @@ -99,6 +100,7 @@ public class SystemSchema extends AbstractSchema private static final String SERVERS_TABLE = "servers"; private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; + private static final String SUPERVISOR_TABLE = "supervisors"; private static final Function> SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment -> @@ -180,6 +182,18 @@ public class SystemSchema extends AbstractSchema .add("error_msg", ValueType.STRING) .build(); + static final RowSignature SUPERVISOR_SIGNATURE = RowSignature + .builder() + .add("supervisor_id", ValueType.STRING) + .add("state", ValueType.STRING) + .add("detailed_state", ValueType.STRING) + .add("healthy", ValueType.LONG) + .add("type", ValueType.STRING) + .add("source", ValueType.STRING) + .add("suspended", ValueType.LONG) + .add("spec", ValueType.STRING) + .build(); + private final Map tableMap; @Inject @@ -207,7 +221,8 @@ public SystemSchema( SEGMENTS_TABLE, segmentsTable, SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), - TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) + TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper), + SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) ); } @@ -729,7 +744,7 @@ private static JsonParserIterator getTasks( try { request = indexingServiceClient.makeRequest( HttpMethod.GET, - StringUtils.format("/druid/indexer/v1/tasks"), + "/druid/indexer/v1/tasks", false ); } @@ -755,6 +770,170 @@ private static JsonParserIterator getTasks( ); } + /** + * This table contains a row per supervisor task. + */ + static class SupervisorsTable extends AbstractTable implements ScannableTable + { + private final DruidLeaderClient druidLeaderClient; + private final ObjectMapper jsonMapper; + private final BytesAccumulatingResponseHandler responseHandler; + private final AuthorizerMapper authorizerMapper; + + public SupervisorsTable( + DruidLeaderClient druidLeaderClient, + ObjectMapper jsonMapper, + BytesAccumulatingResponseHandler responseHandler, + AuthorizerMapper authorizerMapper + ) + { + this.druidLeaderClient = druidLeaderClient; + this.jsonMapper = jsonMapper; + this.responseHandler = responseHandler; + this.authorizerMapper = authorizerMapper; + } + + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + return SUPERVISOR_SIGNATURE.getRelDataType(typeFactory); + } + + @Override + public TableType getJdbcTableType() + { + return TableType.SYSTEM_TABLE; + } + + @Override + public Enumerable scan(DataContext root) + { + class SupervisorsEnumerable extends DefaultEnumerable + { + private final CloseableIterator it; + + public SupervisorsEnumerable(JsonParserIterator tasks) + { + this.it = getAuthorizedSupervisors(tasks, root); + } + + @Override + public Iterator iterator() + { + throw new UnsupportedOperationException("Do not use iterator(), it cannot be closed."); + } + + @Override + public Enumerator enumerator() + { + return new Enumerator() + { + @Override + public Object[] current() + { + final SupervisorStatus supervisor = it.next(); + return new Object[]{ + supervisor.getId(), + supervisor.getState(), + supervisor.getDetailedState(), + supervisor.isHealthy() ? 1L : 0L, + supervisor.getType(), + supervisor.getSource(), + supervisor.isSuspended() ? 1L : 0L, + supervisor.getSpecString() + }; + } + + @Override + public boolean moveNext() + { + return it.hasNext(); + } + + @Override + public void reset() + { + + } + + @Override + public void close() + { + try { + it.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + } + + return new SupervisorsEnumerable(getSupervisors(druidLeaderClient, jsonMapper, responseHandler)); + } + + private CloseableIterator getAuthorizedSupervisors( + JsonParserIterator it, + DataContext root + ) + { + final AuthenticationResult authenticationResult = + (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); + + Function> raGenerator = supervisor -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource())); + + final Iterable authorizedSupervisors = AuthorizationUtils.filterAuthorizedResources( + authenticationResult, + () -> it, + raGenerator, + authorizerMapper + ); + + return wrap(authorizedSupervisors.iterator(), it); + } + } + + // Note that overlord must be up to get supervisor tasks, otherwise queries to sys.supervisors table + // will fail with internal server error (HTTP 500) + private static JsonParserIterator getSupervisors( + DruidLeaderClient indexingServiceClient, + ObjectMapper jsonMapper, + BytesAccumulatingResponseHandler responseHandler + ) + { + Request request; + try { + request = indexingServiceClient.makeRequest( + HttpMethod.GET, + "/druid/indexer/v1/supervisor?system", + false + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } + ListenableFuture future = indexingServiceClient.goAsync( + request, + responseHandler + ); + + final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() + { + }); + return new JsonParserIterator<>( + typeRef, + future, + request.getUrl().toString(), + null, + request.getUrl().getHost(), + jsonMapper, + responseHandler + ); + } + private static CloseableIterator wrap(Iterator iterator, JsonParserIterator it) { return new CloseableIterator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 12448503a196..4b411da4a858 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -326,6 +326,7 @@ public void testInformationSchemaTables() throws Exception .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) .build() ); @@ -351,6 +352,7 @@ public void testInformationSchemaTables() throws Exception .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) .build() ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 235c40b4b967..ab10c65842aa 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -453,10 +453,16 @@ DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerT @Test public void testGetTableMap() { - Assert.assertEquals(ImmutableSet.of("segments", "servers", "server_segments", "tasks"), schema.getTableNames()); + Assert.assertEquals( + ImmutableSet.of("segments", "servers", "server_segments", "tasks", "supervisors"), + schema.getTableNames() + ); final Map tableMap = schema.getTableMap(); - Assert.assertEquals(ImmutableSet.of("segments", "servers", "server_segments", "tasks"), tableMap.keySet()); + Assert.assertEquals( + ImmutableSet.of("segments", "servers", "server_segments", "tasks", "supervisors"), + tableMap.keySet() + ); final SystemSchema.SegmentsTable segmentsTable = (SystemSchema.SegmentsTable) schema.getTableMap().get("segments"); final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); @@ -1113,6 +1119,90 @@ public Object get(String name) verifyTypes(rows, SystemSchema.TASKS_SIGNATURE); } + @Test + public void testSupervisorTable() throws Exception + { + + SystemSchema.SupervisorsTable supervisorTable = EasyMock.createMockBuilder(SystemSchema.SupervisorsTable.class) + .withConstructor(client, mapper, responseHandler, authMapper) + .createMock(); + EasyMock.replay(supervisorTable); + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system", false)) + .andReturn(request) + .anyTimes(); + SettableFuture future = SettableFuture.create(); + EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once(); + final int ok = HttpServletResponse.SC_OK; + EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes(); + EasyMock.expect(request.getUrl()) + .andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?system")) + .anyTimes(); + + AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); + + String json = "[{\n" + + "\t\"id\": \"wikipedia\",\n" + + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n" + + "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n" + + "\t\"healthy\": false,\n" + + "\t\"specString\": \"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}" + + ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n" + + "\t\"type\": \"kafka\",\n" + + "\t\"source\": \"wikipedia\",\n" + + "\t\"suspended\": false\n" + + "}]"; + + byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); + in.add(bytesToWrite); + in.done(); + future.set(in); + + EasyMock.replay(client, request, responseHandler); + DataContext dataContext = new DataContext() + { + @Override + public SchemaPlus getRootSchema() + { + return null; + } + + @Override + public JavaTypeFactory getTypeFactory() + { + return null; + } + + @Override + public QueryProvider getQueryProvider() + { + return null; + } + + @Override + public Object get(String name) + { + return CalciteTests.SUPER_USER_AUTH_RESULT; + } + }; + final List rows = supervisorTable.scan(dataContext).toList(); + + Object[] row0 = rows.get(0); + Assert.assertEquals("wikipedia", row0[0].toString()); + Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString()); + Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString()); + Assert.assertEquals(0L, row0[3]); + Assert.assertEquals("kafka", row0[4].toString()); + Assert.assertEquals("wikipedia", row0[5].toString()); + Assert.assertEquals(0L, row0[6]); + Assert.assertEquals( + "{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}", + row0[7].toString() + ); + + // Verify value types. + verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE); + } + private static void verifyTypes(final List rows, final RowSignature signature) { final RelDataType rowType = signature.getRelDataType(new JavaTypeFactoryImpl()); diff --git a/website/.spelling b/website/.spelling index 0ed9a2649027..04cd774feaff 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1387,6 +1387,7 @@ avg_num_rows avg_size created_time current_size +detailed_state druid.server.maxSize druid.server.tier druid.sql.planner.maxSemiJoinRowsInMemory @@ -1413,6 +1414,7 @@ runner_status segment_id server_type sqlTimeZone +supervisor_id sys sys.segments task_id