From 21c1bd6db178799e3b5b0514942569a9a090895c Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 16 Sep 2019 14:06:16 -0700 Subject: [PATCH 01/11] Add supervisors table to SystemSchema --- .../MaterializedViewSupervisorSpec.java | 15 ++ .../kafka/supervisor/KafkaSupervisorSpec.java | 14 ++ .../supervisor/KinesisSupervisorSpec.java | 13 + .../supervisor/SupervisorResource.java | 38 ++- .../OverlordSecurityResourceFilterTest.java | 12 + .../supervisor/SupervisorManagerTest.java | 12 + .../supervisor/SupervisorResourceTest.java | 96 +++++-- .../supervisor/NoopSupervisorSpec.java | 31 ++- .../overlord/supervisor/SupervisorSpec.java | 16 ++ .../overlord/supervisor/SupervisorStatus.java | 237 ++++++++++++++++++ .../supervisor/SupervisorStatusTest.java | 69 +++++ .../SQLMetadataSupervisorManagerTest.java | 13 + .../druid/metadata/TestSupervisorSpec.java | 12 + .../sql/calcite/schema/SystemSchema.java | 180 ++++++++++++- .../sql/calcite/schema/SystemSchemaTest.java | 94 ++++++- 15 files changed, 817 insertions(+), 35 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java create mode 100644 server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 874ae6106513..031721b1af55 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -62,6 +62,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec { private static final String TASK_PREFIX = "index_materialized_view"; + private static final String TASK_TYPE = "materialized_view"; private final String baseDataSource; private final DimensionsSpec dimensionsSpec; private final AggregatorFactory[] aggregators; @@ -325,6 +326,20 @@ public boolean isSuspended() return suspended; } + @Override + @JsonProperty("type") + public String getType() + { + return TASK_TYPE; + } + + @Override + @JsonProperty("source") + public String getSource() + { + return getBaseDataSource(); + } + @Override public String getId() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index 60c187f07487..772a26ad07c3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -40,6 +40,8 @@ public class KafkaSupervisorSpec extends SeekableStreamSupervisorSpec { + private static final String TASK_TYPE = "kafka"; + @JsonCreator public KafkaSupervisorSpec( @JsonProperty("dataSchema") DataSchema dataSchema, @@ -103,6 +105,18 @@ public KafkaSupervisorSpec( ); } + @Override + public String getType() + { + return TASK_TYPE; + } + + @Override + public String getSource() + { + return getIoConfig() != null ? getIoConfig().getTopic() : null; + } + @Override public Supervisor createSupervisor() { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index e921fc98f8a9..2639c9ce6b02 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -42,6 +42,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec { + private static final String TASK_TYPE = "kinesis"; private final AWSCredentialsConfig awsCredentialsConfig; @JsonCreator @@ -131,6 +132,18 @@ public Supervisor createSupervisor() ); } + @Override + public String getType() + { + return TASK_TYPE; + } + + @Override + public String getSource() + { + return getIoConfig() != null ? getIoConfig().getStream() : null; + } + @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 1762903cab17..4a8362d51b3b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -19,6 +19,8 @@ package org.apache.druid.indexing.overlord.supervisor; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -79,12 +81,14 @@ public class SupervisorResource private final TaskMaster taskMaster; private final AuthorizerMapper authorizerMapper; + private final ObjectMapper objectMapper; @Inject - public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapper) + public SupervisorResource(TaskMaster taskMaster, AuthorizerMapper authorizerMapper, ObjectMapper objectMapper) { this.taskMaster = taskMaster; this.authorizerMapper = authorizerMapper; + this.objectMapper = objectMapper; } @POST @@ -120,6 +124,7 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe public Response specGetAll( @QueryParam("full") String full, @QueryParam("state") Boolean state, + @QueryParam("fullStatus") String fullStatus, @Context final HttpServletRequest req ) { @@ -132,24 +137,39 @@ public Response specGetAll( ); final boolean includeFull = full != null; final boolean includeState = state != null && state; + final boolean includeFullStatus = fullStatus != null; - if (includeFull || includeState) { - List> allStates = authorizedSupervisorIds + if (includeFull || includeState || includeFullStatus) { + List allStates = authorizedSupervisorIds .stream() .map(x -> { Optional theState = manager.getSupervisorState(x); - ImmutableMap.Builder theBuilder = ImmutableMap.builder(); - theBuilder.put("id", x); + SupervisorStatus.Builder theBuilder = new SupervisorStatus.Builder(); + theBuilder.withId(x); if (theState.isPresent()) { - theBuilder.put("state", theState.get().getBasicState()); - theBuilder.put("detailedState", theState.get()); - theBuilder.put("healthy", theState.get().isHealthy()); + theBuilder.withState(theState.get().getBasicState().toString()) + .withDetailedState(theState.get().toString()) + .withHealthy(theState.get().isHealthy()); } if (includeFull) { Optional theSpec = manager.getSupervisorSpec(x); if (theSpec.isPresent()) { - theBuilder.put("spec", theSpec.get()); + theBuilder.withSpec(manager.getSupervisorSpec(x).get()); + } + } + if(includeFullStatus){ + Optional theSpec = manager.getSupervisorSpec(x); + if (theSpec.isPresent()) { + try { + theBuilder.withSpecString(objectMapper.writeValueAsString(manager.getSupervisorSpec(x).get())); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + theBuilder.withType(manager.getSupervisorSpec(x).get().getType()) + .withSource(manager.getSupervisorSpec(x).get().getSource()) + .withSuspended(manager.getSupervisorSpec(x).get().isSuspended()); } } return theBuilder.build(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java index 7977164b1ca6..e603a553e244 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java @@ -149,6 +149,18 @@ public boolean isSuspended() { return false; } + + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } }; EasyMock.expect(supervisorManager.getSupervisorSpec(EasyMock.anyString())) .andReturn(Optional.of(supervisorSpec)) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 6f62947d5567..6b93f84d1625 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -410,6 +410,18 @@ public boolean isSuspended() return suspended; } + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } + @Override public List getDataSources() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index ef477fe209a5..ed628b62e809 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.segment.TestHelper; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AuthConfig; import org.apache.druid.server.security.AuthenticationResult; @@ -44,7 +45,9 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; +import java.io.IOException; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -53,6 +56,7 @@ @RunWith(EasyMockRunner.class) public class SupervisorResourceTest extends EasyMockSupport { + private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); private static final TestSupervisorSpec SPEC1 = new TestSupervisorSpec( "id1", null, @@ -100,7 +104,8 @@ public Authorizer getAuthorizer(String name) } }; } - } + }, + OBJECT_MAPPER ); } @@ -160,7 +165,7 @@ public void testSpecGetAll() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll(null, null, request); + Response response = supervisorResource.specGetAll(null, null, null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); @@ -170,7 +175,7 @@ public void testSpecGetAll() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.specGetAll(null, null, request); + response = supervisorResource.specGetAll(null, null, null, request); verifyAll(); Assert.assertEquals(503, response.getStatus()); @@ -184,10 +189,10 @@ public void testSpecGetAllFull() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce(); - EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).times(2); - EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).times(2); - EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).times(1); - EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).times(1); + EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).anyTimes(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( @@ -197,20 +202,59 @@ public void testSpecGetAllFull() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll("", null, request); + Response response = supervisorResource.specGetAll("", null, null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); - List> specs = (List>) response.getEntity(); + List specs = (List) response.getEntity(); Assert.assertTrue( specs.stream() .allMatch(spec -> - ("id1".equals(spec.get("id")) && SPEC1.equals(spec.get("spec"))) || - ("id2".equals(spec.get("id")) && SPEC2.equals(spec.get("spec"))) + ("id1".equals(spec.getId()) && SPEC1.equals(spec.getSpec())) || + ("id2".equals(spec.getId()) && SPEC2.equals(spec.getSpec())) ) ); } + @Test + public void testSpecGetAllFullStatus() + { + SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING; + SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(SUPERVISOR_IDS).atLeastOnce(); + EasyMock.expect(supervisorManager.getSupervisorSpec("id1")).andReturn(Optional.of(SPEC1)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorSpec("id2")).andReturn(Optional.of(SPEC2)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorState("id1")).andReturn(Optional.of(state1)).anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorState("id2")).andReturn(Optional.of(state2)).anyTimes(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn( + new AuthenticationResult("druid", "druid", null, null) + ).atLeastOnce(); + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + replayAll(); + + Response response = supervisorResource.specGetAll(null, null, "", request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + List specs = (List) response.getEntity(); + specs.sort(Comparator.comparing(SupervisorStatus::getId)); + Assert.assertEquals(2, specs.size()); + SupervisorStatus spec = specs.get(0); + Assert.assertEquals("id1", spec.getId()); + Assert.assertEquals("RUNNING", spec.getState()); + Assert.assertEquals("RUNNING", spec.getDetailedState()); + Assert.assertEquals(true, spec.isHealthy()); + Assert.assertEquals("{\"type\":\"SupervisorResourceTest$TestSupervisorSpec\"}", spec.getSpecString()); + Assert.assertEquals("test", spec.getType()); + Assert.assertEquals("dummy", spec.getSource()); + Assert.assertEquals(false, spec.isSuspended()); + } + @Test public void testSpecGetState() { @@ -232,23 +276,23 @@ public void testSpecGetState() EasyMock.expectLastCall().anyTimes(); replayAll(); - Response response = supervisorResource.specGetAll(null, true, request); + Response response = supervisorResource.specGetAll(null, true, null, request); verifyAll(); Assert.assertEquals(200, response.getStatus()); - List> states = (List>) response.getEntity(); + List states = (List) response.getEntity(); Assert.assertTrue( states.stream() .allMatch(state -> { - final String id = (String) state.get("id"); + final String id = (String) state.getId(); if ("id1".equals(id)) { - return state1.equals(state.get("state")) - && state1.equals(state.get("detailedState")) - && (Boolean) state.get("healthy") == state1.isHealthy(); + return state1.toString().equals(state.getState()) + && state1.toString().equals(state.getDetailedState()) + && (Boolean) state.isHealthy() == state1.isHealthy(); } else if ("id2".equals(id)) { - return state2.equals(state.get("state")) - && state2.equals(state.get("detailedState")) - && (Boolean) state.get("healthy") == state2.isHealthy(); + return state2.toString().equals(state.getState()) + && state2.toString().equals(state.getDetailedState()) + && (Boolean) state.isHealthy() == state2.isHealthy(); } return false; }) @@ -1137,6 +1181,18 @@ public boolean isSuspended() return suspended; } + @Override + public String getType() + { + return "test"; + } + + @Override + public String getSource() + { + return "dummy"; + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 1d1a61f58f79..a3c10889f4a2 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -48,25 +48,36 @@ public class NoopSupervisorSpec implements SupervisorSpec @JsonProperty("suspended") private boolean suspended; //ignored + @JsonProperty("type") + private String type; //ignored + + @JsonProperty("source") + private String source; //ignored + @VisibleForTesting public NoopSupervisorSpec( String id, List datasources ) { - this(id, datasources, null); + this(id, datasources, null, null, null); } @JsonCreator public NoopSupervisorSpec( @JsonProperty("id") @Nullable String id, @JsonProperty("dataSources") @Nullable List datasources, - @JsonProperty("suspended") @Nullable Boolean suspended + @JsonProperty("suspended") @Nullable Boolean suspended, + @JsonProperty("type") @Nullable String type, + @JsonProperty("source") @Nullable String source ) { this.id = id; this.datasources = datasources == null ? new ArrayList<>() : datasources; - this.suspended = false; // ignore + // these are ignored + this.suspended = false; + this.type = "noop"; + this.source = "noop"; } @Override @@ -92,6 +103,20 @@ public boolean isSuspended() return suspended; } + @Override + @JsonProperty("type") + public String getType() + { + return type; + } + + @Override + @JsonProperty("source") + public String getSource() + { + return source; + } + @Override public Supervisor createSupervisor() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index e875dfc39e95..a2c216754f27 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -56,4 +56,20 @@ default boolean isSuspended() { return false; } + + /** + * This API is only used for informational purposes in + * {@link org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable} + * + * @return supervisor task type + */ + String getType(); + + /** + * This API is only used for informational purposes in + * {@link org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable} + * + * @return source like stream or topic name + */ + String getSource(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java new file mode 100644 index 000000000000..0942f3086d88 --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; + +import java.util.Objects; + +/** + * This class contains the attributes of a supervisor task which are returned by the API's in {@link SupervisorResource} + * and used by {@link org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable} + */ +@JsonDeserialize(builder = SupervisorStatus.Builder.class) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class SupervisorStatus +{ + private final String id; + private final String state; + private final String detailedState; + private final boolean healthy; + private final SupervisorSpec spec; + /** + * This is a stringified version of spec object + */ + private final String specString; + private final String type; + private final String source; + private final boolean suspended; + + private SupervisorStatus( + Builder builder + ) + { + this.id = builder.id; + this.state = builder.state; + this.detailedState = builder.detailedState; + this.healthy = builder.healthy; + this.spec = builder.spec; + this.specString = builder.specString; + this.type = builder.type; + this.source = builder.source; + this.suspended = builder.suspended; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SupervisorStatus that = (SupervisorStatus) o; + return healthy == that.healthy && + Objects.equals(id, that.id) && + Objects.equals(state, that.state) && + Objects.equals(detailedState, that.detailedState) && + Objects.equals(spec, that.spec) && + Objects.equals(specString, that.specString) && + Objects.equals(type, that.type) && + Objects.equals(source, that.source) && + suspended == that.suspended; + } + + @Override + public int hashCode() + { + return Objects.hash(id, state, detailedState, healthy, spec, specString, type, source, suspended); + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + public String getState() + { + return state; + } + + @JsonProperty + public String getDetailedState() + { + return detailedState; + } + + @JsonProperty + public boolean isHealthy() + { + return healthy; + } + + @JsonProperty + public SupervisorSpec getSpec() + { + return spec; + } + + @JsonProperty + public String getSpecString() + { + return specString; + } + + @JsonProperty + public String getType() + { + return type; + } + + @JsonProperty + public String getSource() + { + return source; + } + + @JsonProperty + public boolean isSuspended() + { + return suspended; + } + + @JsonPOJOBuilder + public static class Builder + { + @JsonProperty("id") + private String id; + @JsonProperty("state") + private String state; + @JsonProperty("detailedState") + private String detailedState; + @JsonProperty("healthy") + private boolean healthy; + @JsonProperty("spec") + private SupervisorSpec spec; + @JsonProperty("specString") + private String specString; + @JsonProperty("type") + private String type; + @JsonProperty("source") + private String source; + @JsonProperty("suspended") + private boolean suspended; + + @JsonProperty + public Builder withId(String id) + { + this.id = id; + return this; + } + + @JsonProperty + public Builder withState(String state) + { + this.state = state; + return this; + } + + @JsonProperty + public Builder withDetailedState(String detailedState) + { + this.detailedState = detailedState; + return this; + } + + @JsonProperty + public Builder withHealthy(boolean healthy) + { + this.healthy = healthy; + return this; + } + + @JsonProperty + public Builder withSpec(SupervisorSpec spec) + { + this.spec = spec; + return this; + } + + @JsonProperty + public Builder withSpecString(String spec) + { + this.specString = spec; + return this; + } + + @JsonProperty + public Builder withType(String type) + { + this.type = type; + return this; + } + + @JsonProperty + public Builder withSource(String source) + { + this.source = source; + return this; + } + + @JsonProperty + public Builder withSuspended(boolean suspended) + { + this.suspended = suspended; + return this; + } + + public SupervisorStatus build() + { + return new SupervisorStatus(this); + } + } +} + diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java new file mode 100644 index 000000000000..4c370453a0bc --- /dev/null +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatusTest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class SupervisorStatusTest +{ + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final SupervisorStatus.Builder builder = new SupervisorStatus.Builder(); + final SupervisorStatus supervisorStatus = builder.withId("wikipedia") + .withState("RUNNING") + .withDetailedState("RUNNING") + .withHealthy(true) + .withType("kafka") + .withSource("wikipedia") + .withSuspended(false) + .build(); + final String serialized = mapper.writeValueAsString(supervisorStatus); + final SupervisorStatus deserialized = mapper.readValue(serialized, SupervisorStatus.class); + Assert.assertEquals(supervisorStatus, deserialized); + } + + @Test + public void testJsonAttr() throws IOException + { + String json = "{" + + "\"id\":\"wikipedia\"," + + "\"state\":\"UNHEALTHY_SUPERVISOR\"," + + "\"detailedState\":\"UNHEALTHY_SUPERVISOR\"," + + "\"healthy\":false," + + "\"type\":\"kafka\"," + + "\"source\":\"wikipedia\"," + + "\"suspended\":false" + + "}"; + final ObjectMapper mapper = new ObjectMapper(); + final SupervisorStatus deserialized = mapper.readValue(json, SupervisorStatus.class); + Assert.assertNotNull(deserialized); + Assert.assertEquals("wikipedia", deserialized.getId()); + final String serialized = mapper.writeValueAsString(deserialized); + Assert.assertTrue(serialized.contains("\"source\"")); + Assert.assertEquals(json, serialized); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index 38e65de4fd20..93322209c8a2 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -190,5 +190,18 @@ public List getDataSources() { return Collections.singletonList(dataSource); } + + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } + } } diff --git a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java index 19802f172ddd..e935a413f045 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java +++ b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java @@ -58,6 +58,18 @@ public List getDataSources() return null; } + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } + @JsonProperty public Object getData() { diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 08f768da6de2..da8bbddd9a29 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -54,6 +54,7 @@ import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeType; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; @@ -99,6 +100,7 @@ public class SystemSchema extends AbstractSchema private static final String SERVERS_TABLE = "servers"; private static final String SERVER_SEGMENTS_TABLE = "server_segments"; private static final String TASKS_TABLE = "tasks"; + private static final String SUPERVISOR_TABLE = "supervisors"; private static final Function> SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR = segment -> @@ -180,6 +182,18 @@ public class SystemSchema extends AbstractSchema .add("error_msg", ValueType.STRING) .build(); + static final RowSignature SUPERVISOR_SIGNATURE = RowSignature + .builder() + .add("id", ValueType.STRING) + .add("state", ValueType.STRING) + .add("detailed_state", ValueType.STRING) + .add("healthy", ValueType.LONG) + .add("type", ValueType.STRING) + .add("source", ValueType.STRING) + .add("suspended", ValueType.LONG) + .add("spec", ValueType.STRING) + .build(); + private final Map tableMap; @Inject @@ -207,7 +221,8 @@ public SystemSchema( SEGMENTS_TABLE, segmentsTable, SERVERS_TABLE, new ServersTable(druidNodeDiscoveryProvider, serverInventoryView, authorizerMapper), SERVER_SEGMENTS_TABLE, new ServerSegmentsTable(serverView, authorizerMapper), - TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) + TASKS_TABLE, new TasksTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper), + SUPERVISOR_TABLE, new SupervisorsTable(overlordDruidLeaderClient, jsonMapper, responseHandler, authorizerMapper) ); } @@ -755,6 +770,169 @@ private static JsonParserIterator getTasks( ); } + /** + * This table contains a row per supervisor task. + */ + static class SupervisorsTable extends AbstractTable implements ScannableTable + { + private final DruidLeaderClient druidLeaderClient; + private final ObjectMapper jsonMapper; + private final BytesAccumulatingResponseHandler responseHandler; + private final AuthorizerMapper authorizerMapper; + + public SupervisorsTable( + DruidLeaderClient druidLeaderClient, + ObjectMapper jsonMapper, + BytesAccumulatingResponseHandler responseHandler, + AuthorizerMapper authorizerMapper + ) + { + this.druidLeaderClient = druidLeaderClient; + this.jsonMapper = jsonMapper; + this.responseHandler = responseHandler; + this.authorizerMapper = authorizerMapper; + } + + + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) + { + return SUPERVISOR_SIGNATURE.getRelDataType(typeFactory); + } + + @Override + public TableType getJdbcTableType() + { + return TableType.SYSTEM_TABLE; + } + + @Override + public Enumerable scan(DataContext root) + { + class SupervisorsEnumerable extends DefaultEnumerable + { + private final CloseableIterator it; + public SupervisorsEnumerable(JsonParserIterator tasks) + { + this.it = getAuthorizedSupervisors(tasks, root); + } + + @Override + public Iterator iterator() + { + throw new UnsupportedOperationException("Do not use iterator(), it cannot be closed."); + } + + @Override + public Enumerator enumerator() + { + return new Enumerator() + { + @Override + public Object[] current() + { + final SupervisorStatus supervisor = it.next(); + return new Object[]{ + supervisor.getId(), + supervisor.getState(), + supervisor.getDetailedState(), + supervisor.isHealthy() ? 1L : 0L, + supervisor.getType(), + supervisor.getSource(), + supervisor.isSuspended() ? 1L : 0L, + supervisor.getSpecString() + }; + } + + @Override + public boolean moveNext() + { + return it.hasNext(); + } + + @Override + public void reset() + { + + } + + @Override + public void close() + { + try { + it.close(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + }; + } + } + + return new SupervisorsEnumerable(getSupervisors(druidLeaderClient, jsonMapper, responseHandler)); + } + + private CloseableIterator getAuthorizedSupervisors( + JsonParserIterator it, + DataContext root + ) + { + final AuthenticationResult authenticationResult = + (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); + + Function> raGenerator = supervisor -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource())); + + final Iterable authorizedTasks = AuthorizationUtils.filterAuthorizedResources( + authenticationResult, + () -> it, + raGenerator, + authorizerMapper + ); + + return wrap(authorizedTasks.iterator(), it); + } + } + + // Note that overlord must be up to get supervisor tasks, otherwise queries to sys.supervisors table + // will fail with internal server error (HTTP 500) + private static JsonParserIterator getSupervisors( + DruidLeaderClient indexingServiceClient, + ObjectMapper jsonMapper, + BytesAccumulatingResponseHandler responseHandler + ) + { + Request request; + try { + request = indexingServiceClient.makeRequest( + HttpMethod.GET, + StringUtils.format("/druid/indexer/v1/supervisor/status?fullStatus"), + false + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } + ListenableFuture future = indexingServiceClient.goAsync( + request, + responseHandler + ); + + final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() + { + }); + return new JsonParserIterator<>( + typeRef, + future, + request.getUrl().toString(), + null, + request.getUrl().getHost(), + jsonMapper, + responseHandler + ); + } + private static CloseableIterator wrap(Iterator iterator, JsonParserIterator it) { return new CloseableIterator() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 235c40b4b967..b4714ea921b7 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -453,10 +453,16 @@ DataNodeService.DISCOVERY_SERVICE_KEY, new DataNodeService("tier", 1000, ServerT @Test public void testGetTableMap() { - Assert.assertEquals(ImmutableSet.of("segments", "servers", "server_segments", "tasks"), schema.getTableNames()); + Assert.assertEquals( + ImmutableSet.of("segments", "servers", "server_segments", "tasks", "supervisors"), + schema.getTableNames() + ); final Map tableMap = schema.getTableMap(); - Assert.assertEquals(ImmutableSet.of("segments", "servers", "server_segments", "tasks"), tableMap.keySet()); + Assert.assertEquals( + ImmutableSet.of("segments", "servers", "server_segments", "tasks", "supervisors"), + tableMap.keySet() + ); final SystemSchema.SegmentsTable segmentsTable = (SystemSchema.SegmentsTable) schema.getTableMap().get("segments"); final RelDataType rowType = segmentsTable.getRowType(new JavaTypeFactoryImpl()); final List fields = rowType.getFieldList(); @@ -1113,6 +1119,90 @@ public Object get(String name) verifyTypes(rows, SystemSchema.TASKS_SIGNATURE); } + @Test + public void testSupervisorTable() throws Exception + { + + SystemSchema.SupervisorsTable supervisorTable = EasyMock.createMockBuilder(SystemSchema.SupervisorsTable.class) + .withConstructor(client, mapper, responseHandler, authMapper) + .createMock(); + EasyMock.replay(supervisorTable); + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor/status?fullStatus", false)) + .andReturn(request) + .anyTimes(); + SettableFuture future = SettableFuture.create(); + EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once(); + final int ok = HttpServletResponse.SC_OK; + EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes(); + EasyMock.expect(request.getUrl()) + .andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?fullStatus")) + .anyTimes(); + + AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); + + String json = "[{\n" + + "\t\"id\": \"wikipedia\",\n" + + "\t\"state\": \"UNHEALTHY_SUPERVISOR\",\n" + + "\t\"detailedState\": \"UNABLE_TO_CONNECT_TO_STREAM\",\n" + + "\t\"healthy\": false,\n" + + "\t\"specString\": \"{\\\"type\\\":\\\"kafka\\\",\\\"dataSchema\\\":{\\\"dataSource\\\":\\\"wikipedia\\\"}" + + ",\\\"context\\\":null,\\\"suspended\\\":false}\",\n" + + "\t\"type\": \"kafka\",\n" + + "\t\"source\": \"wikipedia\",\n" + + "\t\"suspended\": false\n" + + "}]"; + + byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); + in.add(bytesToWrite); + in.done(); + future.set(in); + + EasyMock.replay(client, request, responseHandler); + DataContext dataContext = new DataContext() + { + @Override + public SchemaPlus getRootSchema() + { + return null; + } + + @Override + public JavaTypeFactory getTypeFactory() + { + return null; + } + + @Override + public QueryProvider getQueryProvider() + { + return null; + } + + @Override + public Object get(String name) + { + return CalciteTests.SUPER_USER_AUTH_RESULT; + } + }; + final List rows = supervisorTable.scan(dataContext).toList(); + + Object[] row0 = rows.get(0); + Assert.assertEquals("wikipedia", row0[0].toString()); + Assert.assertEquals("UNHEALTHY_SUPERVISOR", row0[1].toString()); + Assert.assertEquals("UNABLE_TO_CONNECT_TO_STREAM", row0[2].toString()); + Assert.assertEquals(0L, row0[3]); + Assert.assertEquals("kafka", row0[4].toString()); + Assert.assertEquals("wikipedia", row0[5].toString()); + Assert.assertEquals(0L, row0[6]); + Assert.assertEquals( + "{\"type\":\"kafka\",\"dataSchema\":{\"dataSource\":\"wikipedia\"},\"context\":null,\"suspended\":false}", + row0[7].toString() + ); + + // Verify value types. + verifyTypes(rows, SystemSchema.SUPERVISOR_SIGNATURE); + } + private static void verifyTypes(final List rows, final RowSignature signature) { final RelDataType rowType = signature.getRelDataType(new JavaTypeFactoryImpl()); From 51a2c770e827043d4e2881c14113971afa15fd73 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 16 Sep 2019 15:30:11 -0700 Subject: [PATCH 02/11] Add docs --- docs/operations/api-reference.md | 16 ++++++++++++++ docs/querying/sql.md | 21 +++++++++++++++++++ .../overlord/supervisor/SupervisorStatus.java | 5 +++-- .../sql/calcite/schema/SystemSchema.java | 4 ++-- 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 9b06eeed0d3b..0a51a2730312 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -522,6 +522,22 @@ Returns a list of objects of the currently active supervisors and their current |`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| |`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| |`healthy`|Boolean|true or false indicator of overall supervisor health| +|`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| + +* `/druid/indexer/v1/supervisor?fullStatus` + +Returns a list of objects of the currently active supervisors. + +|Field|Type|Description| +|---|---|---| +|`id`|String|supervisor unique identifier| +|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| +|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| +|`healthy`|Boolean|true or false indicator of overall supervisor health| +|`specString`|String|a json string of supervisor spec| +|`type`|String|type of supervisor task, eg. `kafka` or `kinesis`| +|`source`|String|source of supervisor task, eg. kafka topic or kinesis stream| +|`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| * `/druid/indexer/v1/supervisor/` diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 4fdd2575b50d..498f418f7bc6 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -771,6 +771,27 @@ For example, to retrieve tasks information filtered by status, use the query SELECT * FROM sys.tasks WHERE status='FAILED'; ``` +#### SUPERVISORS table + +The supervisors table provides information about supervisor tasks. + +|Column|Type|Notes| +|------|-----|-----| +|supervisor_id|STRING|supervisor task identifier| +|state|STRING|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| +|detailed_state|STRING|supervisor specific state. (See documentation of specific supervisor for details)| +|healthy|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates a healthy supervisor| +|type|STRING|type of supervisor task, eg. `kafka`, `kinesis` or `materialized_view`| +|source|STRING|source of the supervisor, eg kafka topic or kinesis stream| +|suspended|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates supervisor is in suspended state| +|spec|STRING|JSON-serialized supervisor spec| + +For example, to retrieve supervisor tasks information filtered by health status, use the query + +```sql +SELECT * FROM sys.supervisors WHERE healthy=0; +``` + Note that sys tables may not support all the Druid SQL Functions. ## Server configuration diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java index 0942f3086d88..876229b3a2b6 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; +import com.google.common.base.Preconditions; import java.util.Objects; @@ -51,7 +52,7 @@ private SupervisorStatus( Builder builder ) { - this.id = builder.id; + this.id = Preconditions.checkNotNull(builder.id, "id"); this.state = builder.state; this.detailedState = builder.detailedState; this.healthy = builder.healthy; @@ -168,7 +169,7 @@ public static class Builder @JsonProperty public Builder withId(String id) { - this.id = id; + this.id = Preconditions.checkNotNull(id, "id"); return this; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index da8bbddd9a29..4fe8b5943a40 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -184,7 +184,7 @@ public class SystemSchema extends AbstractSchema static final RowSignature SUPERVISOR_SIGNATURE = RowSignature .builder() - .add("id", ValueType.STRING) + .add("supervisor_id", ValueType.STRING) .add("state", ValueType.STRING) .add("detailed_state", ValueType.STRING) .add("healthy", ValueType.LONG) @@ -907,7 +907,7 @@ private static JsonParserIterator getSupervisors( try { request = indexingServiceClient.makeRequest( HttpMethod.GET, - StringUtils.format("/druid/indexer/v1/supervisor/status?fullStatus"), + StringUtils.format("/druid/indexer/v1/supervisor?fullStatus"), false ); } From a64fbd315fe02f8f4da68fdb8035e50491826f1c Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 16 Sep 2019 16:24:02 -0700 Subject: [PATCH 03/11] fix checkstyle --- .../druid/indexing/overlord/supervisor/SupervisorResource.java | 2 +- .../indexing/overlord/supervisor/SupervisorResourceTest.java | 1 - .../java/org/apache/druid/sql/calcite/schema/SystemSchema.java | 3 ++- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 4a8362d51b3b..ef62a7b74824 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -158,7 +158,7 @@ public Response specGetAll( theBuilder.withSpec(manager.getSupervisorSpec(x).get()); } } - if(includeFullStatus){ + if (includeFullStatus) { Optional theSpec = manager.getSupervisorSpec(x); if (theSpec.isPresent()) { try { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index ed628b62e809..bcddf9a815f4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -45,7 +45,6 @@ import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; -import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 4fe8b5943a40..176dd890a6e6 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -54,8 +54,8 @@ import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.discovery.DruidNodeDiscoveryProvider; import org.apache.druid.discovery.NodeType; -import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.indexer.TaskStatusPlus; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; @@ -812,6 +812,7 @@ public Enumerable scan(DataContext root) class SupervisorsEnumerable extends DefaultEnumerable { private final CloseableIterator it; + public SupervisorsEnumerable(JsonParserIterator tasks) { this.it = getAuthorizedSupervisors(tasks, root); From 7e3531d6f8dd20f61e73800f07415dc03d919fee Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 17 Sep 2019 17:09:10 -0700 Subject: [PATCH 04/11] fix test --- docs/operations/api-reference.md | 2 +- .../druid/indexing/overlord/supervisor/SupervisorStatus.java | 2 +- .../org/apache/druid/sql/calcite/schema/SystemSchemaTest.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 0a51a2730312..6b8378b7b022 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -534,7 +534,7 @@ Returns a list of objects of the currently active supervisors. |`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| |`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| |`healthy`|Boolean|true or false indicator of overall supervisor health| -|`specString`|String|a json string of supervisor spec| +|`specString`|String|a JSON string of supervisor spec| |`type`|String|type of supervisor task, eg. `kafka` or `kinesis`| |`source`|String|source of supervisor task, eg. kafka topic or kinesis stream| |`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java index 876229b3a2b6..589be3cbd2f0 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java @@ -41,7 +41,7 @@ public class SupervisorStatus private final boolean healthy; private final SupervisorSpec spec; /** - * This is a stringified version of spec object + * This is a JSON representation of spec object */ private final String specString; private final String type; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index b4714ea921b7..5acbc96dcdf9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -1127,7 +1127,7 @@ public void testSupervisorTable() throws Exception .withConstructor(client, mapper, responseHandler, authMapper) .createMock(); EasyMock.replay(supervisorTable); - EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor/status?fullStatus", false)) + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?fullStatus", false)) .andReturn(request) .anyTimes(); SettableFuture future = SettableFuture.create(); From e0fe32ffa527a8f5131947bcca5446805cf092dd Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 18 Sep 2019 11:20:00 -0700 Subject: [PATCH 05/11] fix CI --- docs/operations/api-reference.md | 4 ++-- docs/querying/sql.md | 4 ++-- .../java/org/apache/druid/sql/calcite/CalciteQueryTest.java | 2 ++ website/.spelling | 2 ++ 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index dec303256b1b..93b79a15790b 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -535,8 +535,8 @@ Returns a list of objects of the currently active supervisors. |`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| |`healthy`|Boolean|true or false indicator of overall supervisor health| |`specString`|String|a JSON string of supervisor spec| -|`type`|String|type of supervisor task, eg. `kafka` or `kinesis`| -|`source`|String|source of supervisor task, eg. kafka topic or kinesis stream| +|`type`|String|type of supervisor task, e.g., `kafka` or `kinesis`| +|`source`|String|source of supervisor task, e.g., Kafka topic or Kinesis stream| |`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| * `/druid/indexer/v1/supervisor/` diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 689e715102db..d2dd520dbe32 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -781,8 +781,8 @@ The supervisors table provides information about supervisor tasks. |state|STRING|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| |detailed_state|STRING|supervisor specific state. (See documentation of specific supervisor for details)| |healthy|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates a healthy supervisor| -|type|STRING|type of supervisor task, eg. `kafka`, `kinesis` or `materialized_view`| -|source|STRING|source of the supervisor, eg kafka topic or kinesis stream| +|type|STRING|type of supervisor task, e.g., `kafka`, `kinesis` or `materialized_view`| +|source|STRING|source of the supervisor, e.g., Kafka topic or Kinesis stream| |suspended|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates supervisor is in suspended state| |spec|STRING|JSON-serialized supervisor spec| diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 12448503a196..4b411da4a858 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -326,6 +326,7 @@ public void testInformationSchemaTables() throws Exception .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) .build() ); @@ -351,6 +352,7 @@ public void testInformationSchemaTables() throws Exception .add(new Object[]{"sys", "segments", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "server_segments", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "servers", "SYSTEM_TABLE"}) + .add(new Object[]{"sys", "supervisors", "SYSTEM_TABLE"}) .add(new Object[]{"sys", "tasks", "SYSTEM_TABLE"}) .build() ); diff --git a/website/.spelling b/website/.spelling index 5e4553a04d87..a31d15c9a47a 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1351,6 +1351,7 @@ avg_num_rows avg_size created_time current_size +detailed_state druid.server.maxSize druid.server.tier druid.sql.planner.maxSemiJoinRowsInMemory @@ -1377,6 +1378,7 @@ runner_status segment_id server_type sqlTimeZone +supervisor_id sys sys.segments task_id From cb9be533e5075ee969f187fe1339dd4b4c7d43ae Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 18 Sep 2019 18:17:14 -0700 Subject: [PATCH 06/11] Add comments --- .../indexing/overlord/supervisor/SupervisorResource.java | 5 +++++ .../druid/indexing/overlord/supervisor/SupervisorStatus.java | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index ef62a7b74824..6b9e18d64ea9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -162,6 +162,11 @@ public Response specGetAll( Optional theSpec = manager.getSupervisorSpec(x); if (theSpec.isPresent()) { try { + // serializing SupervisorSpec here, so that callers of `druid/indexer/v1/supervisor?fullStatus` + // which are outside the overlord process can deserialize the response and get a json + // payload of SupervisorSpec object when they don't have guice bindings for all the fields + // for example, broker does not have bindings for all fields of `KafkaSupervisorSpec` or + // `KinesisSupervisorSpec` theBuilder.withSpecString(objectMapper.writeValueAsString(manager.getSupervisorSpec(x).get())); } catch (JsonProcessingException e) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java index 589be3cbd2f0..1cad3bb5c52c 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java @@ -41,7 +41,9 @@ public class SupervisorStatus private final boolean healthy; private final SupervisorSpec spec; /** - * This is a JSON representation of spec object + * This is a JSON representation of {@code spec} + * The explicit serialization is present here so that users of {@code SupervisorStatus} which cannot + * deserialize {@link SupervisorSpec} can use this attribute instead */ private final String specString; private final String type; From 34b41304ae09a59f6a315da362912d7008e06313 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 24 Sep 2019 11:13:48 -0700 Subject: [PATCH 07/11] Fix javadoc teamcity error --- .../druid/indexing/overlord/supervisor/SupervisorSpec.java | 4 ++-- .../druid/indexing/overlord/supervisor/SupervisorStatus.java | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index a2c216754f27..a173637ed51a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -59,7 +59,7 @@ default boolean isSuspended() /** * This API is only used for informational purposes in - * {@link org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable} + * org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable * * @return supervisor task type */ @@ -67,7 +67,7 @@ default boolean isSuspended() /** * This API is only used for informational purposes in - * {@link org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable} + * org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable * * @return source like stream or topic name */ diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java index 1cad3bb5c52c..f2cb1854c876 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java @@ -28,8 +28,9 @@ import java.util.Objects; /** - * This class contains the attributes of a supervisor task which are returned by the API's in {@link SupervisorResource} - * and used by {@link org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable} + * This class contains the attributes of a supervisor task which are returned by the API's in + * org.apache.druid.indexing.overlord.supervisor.SupervisorResource + * and used by org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable */ @JsonDeserialize(builder = SupervisorStatus.Builder.class) @JsonInclude(JsonInclude.Include.NON_NULL) From 9cea0c158385321077e03556a3d3f759f5002bb1 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 1 Oct 2019 08:12:33 -0700 Subject: [PATCH 08/11] comments --- docs/operations/api-reference.md | 12 ++++++------ docs/querying/sql.md | 12 ++++++------ .../MaterializedViewSupervisorSpec.java | 4 ++-- .../kinesis/supervisor/KinesisSupervisorSpec.java | 4 ++-- .../indexing/overlord/supervisor/SupervisorSpec.java | 2 +- .../overlord/supervisor/SupervisorStatus.java | 4 ++-- .../druid/sql/calcite/schema/SystemSchema.java | 4 ++-- 7 files changed, 21 insertions(+), 21 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 93b79a15790b..a7ad2483825f 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -519,8 +519,8 @@ Returns a list of objects of the currently active supervisors and their current |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| -|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| +|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/kafka-ingestion.html#operations) for details.| +|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/kinesis-ingestion.html))| |`healthy`|Boolean|true or false indicator of overall supervisor health| |`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| @@ -531,12 +531,12 @@ Returns a list of objects of the currently active supervisors. |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| -|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| +|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/kafka-ingestion.html#operations) for details.| +|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/kinesis-ingestion.html))| |`healthy`|Boolean|true or false indicator of overall supervisor health| |`specString`|String|a JSON string of supervisor spec| -|`type`|String|type of supervisor task, e.g., `kafka` or `kinesis`| -|`source`|String|source of supervisor task, e.g., Kafka topic or Kinesis stream| +|`type`|String|type of supervisor, e.g. `kafka` or `kinesis`| +|`source`|String|source of supervisor, e.g. Kafka topic or Kinesis stream| |`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| * `/druid/indexer/v1/supervisor/` diff --git a/docs/querying/sql.md b/docs/querying/sql.md index d2dd520dbe32..2c910fe7d7b4 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -773,16 +773,16 @@ SELECT * FROM sys.tasks WHERE status='FAILED'; #### SUPERVISORS table -The supervisors table provides information about supervisor tasks. +The supervisors table provides information about supervisors. |Column|Type|Notes| |------|-----|-----| -|supervisor_id|STRING|supervisor task identifier| -|state|STRING|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| -|detailed_state|STRING|supervisor specific state. (See documentation of specific supervisor for details)| +|supervisor_id|STRING|Supervisor task identifier| +|state|STRING|Basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/kafka-ingestion.html#operations) for details.|| +|detailed_state|STRING|Supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/kinesis-ingestion.html))| |healthy|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates a healthy supervisor| -|type|STRING|type of supervisor task, e.g., `kafka`, `kinesis` or `materialized_view`| -|source|STRING|source of the supervisor, e.g., Kafka topic or Kinesis stream| +|type|STRING|Type of supervisor, e.g. `kafka`, `kinesis` or `materialized_view`| +|source|STRING|Source of the supervisor, e.g. Kafka topic or Kinesis stream| |suspended|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates supervisor is in suspended state| |spec|STRING|JSON-serialized supervisor spec| diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 031721b1af55..0e25841fe3c6 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -62,7 +62,7 @@ public class MaterializedViewSupervisorSpec implements SupervisorSpec { private static final String TASK_PREFIX = "index_materialized_view"; - private static final String TASK_TYPE = "materialized_view"; + private static final String SUPERVISOR_TYPE = "materialized_view"; private final String baseDataSource; private final DimensionsSpec dimensionsSpec; private final AggregatorFactory[] aggregators; @@ -330,7 +330,7 @@ public boolean isSuspended() @JsonProperty("type") public String getType() { - return TASK_TYPE; + return SUPERVISOR_TYPE; } @Override diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java index 2639c9ce6b02..833604f15aff 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorSpec.java @@ -42,7 +42,7 @@ public class KinesisSupervisorSpec extends SeekableStreamSupervisorSpec { - private static final String TASK_TYPE = "kinesis"; + private static final String SUPERVISOR_TYPE = "kinesis"; private final AWSCredentialsConfig awsCredentialsConfig; @JsonCreator @@ -135,7 +135,7 @@ public Supervisor createSupervisor() @Override public String getType() { - return TASK_TYPE; + return SUPERVISOR_TYPE; } @Override diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index a173637ed51a..041156ce4251 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -61,7 +61,7 @@ default boolean isSuspended() * This API is only used for informational purposes in * org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable * - * @return supervisor task type + * @return supervisor type */ String getType(); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java index f2cb1854c876..e87c8bbf31fe 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStatus.java @@ -28,7 +28,7 @@ import java.util.Objects; /** - * This class contains the attributes of a supervisor task which are returned by the API's in + * This class contains the attributes of a supervisor which are returned by the API's in * org.apache.druid.indexing.overlord.supervisor.SupervisorResource * and used by org.apache.druid.sql.calcite.schema.SystemSchema.SupervisorsTable */ @@ -43,7 +43,7 @@ public class SupervisorStatus private final SupervisorSpec spec; /** * This is a JSON representation of {@code spec} - * The explicit serialization is present here so that users of {@code SupervisorStatus} which cannot + * The explicit serialization is present here so that users of {@code SupervisorStatus} which cannot * deserialize {@link SupervisorSpec} can use this attribute instead */ private final String specString; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 176dd890a6e6..79718a4de176 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -744,7 +744,7 @@ private static JsonParserIterator getTasks( try { request = indexingServiceClient.makeRequest( HttpMethod.GET, - StringUtils.format("/druid/indexer/v1/tasks"), + "/druid/indexer/v1/tasks", false ); } @@ -908,7 +908,7 @@ private static JsonParserIterator getSupervisors( try { request = indexingServiceClient.makeRequest( HttpMethod.GET, - StringUtils.format("/druid/indexer/v1/supervisor?fullStatus"), + "/druid/indexer/v1/supervisor?fullStatus", false ); } From e177120add1377eaaf52377dac7bd948008afbb3 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 1 Oct 2019 12:01:03 -0700 Subject: [PATCH 09/11] fix links in docs --- docs/operations/api-reference.md | 8 ++++---- docs/querying/sql.md | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index a7ad2483825f..a1f402d4671f 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -519,8 +519,8 @@ Returns a list of objects of the currently active supervisors and their current |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/kafka-ingestion.html#operations) for details.| -|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/kinesis-ingestion.html))| +|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/extensions-core/kafka-ingestion.html#operations) for details.| +|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/extensions-core/kinesis-ingestion.html))| |`healthy`|Boolean|true or false indicator of overall supervisor health| |`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| @@ -531,8 +531,8 @@ Returns a list of objects of the currently active supervisors. |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/kafka-ingestion.html#operations) for details.| -|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/kinesis-ingestion.html))| +|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/extensions-core/kafka-ingestion.html#operations) for details.| +|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/extensions-core/kinesis-ingestion.html))| |`healthy`|Boolean|true or false indicator of overall supervisor health| |`specString`|String|a JSON string of supervisor spec| |`type`|String|type of supervisor, e.g. `kafka` or `kinesis`| diff --git a/docs/querying/sql.md b/docs/querying/sql.md index 2c910fe7d7b4..c5d0473605e9 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -778,8 +778,8 @@ The supervisors table provides information about supervisors. |Column|Type|Notes| |------|-----|-----| |supervisor_id|STRING|Supervisor task identifier| -|state|STRING|Basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/kafka-ingestion.html#operations) for details.|| -|detailed_state|STRING|Supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/kinesis-ingestion.html))| +|state|STRING|Basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/extensions-core/kafka-ingestion.html#operations) for details.| +|detailed_state|STRING|Supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/extensions-core/kinesis-ingestion.html))| |healthy|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates a healthy supervisor| |type|STRING|Type of supervisor, e.g. `kafka`, `kinesis` or `materialized_view`| |source|STRING|Source of the supervisor, e.g. Kafka topic or Kinesis stream| From aa1ac7993e3bab66411e94bca7eebfa3ee253f41 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 1 Oct 2019 13:23:07 -0700 Subject: [PATCH 10/11] fix links --- docs/operations/api-reference.md | 8 ++++---- docs/querying/sql.md | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index a1f402d4671f..58a7df4502e6 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -519,8 +519,8 @@ Returns a list of objects of the currently active supervisors and their current |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/extensions-core/kafka-ingestion.html#operations) for details.| -|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/extensions-core/kinesis-ingestion.html))| +|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.| +|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))| |`healthy`|Boolean|true or false indicator of overall supervisor health| |`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| @@ -531,8 +531,8 @@ Returns a list of objects of the currently active supervisors. |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/extensions-core/kafka-ingestion.html#operations) for details.| -|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/extensions-core/kinesis-ingestion.html))| +|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.| +|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))| |`healthy`|Boolean|true or false indicator of overall supervisor health| |`specString`|String|a JSON string of supervisor spec| |`type`|String|type of supervisor, e.g. `kafka` or `kinesis`| diff --git a/docs/querying/sql.md b/docs/querying/sql.md index c5d0473605e9..61a38b290c86 100644 --- a/docs/querying/sql.md +++ b/docs/querying/sql.md @@ -778,8 +778,8 @@ The supervisors table provides information about supervisors. |Column|Type|Notes| |------|-----|-----| |supervisor_id|STRING|Supervisor task identifier| -|state|STRING|Basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../../development/extensions-core/kafka-ingestion.html#operations) for details.| -|detailed_state|STRING|Supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../../development/kafka-ingestion.html) or [Kinesis](../../development/extensions-core/kinesis-ingestion.html))| +|state|STRING|Basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.| +|detailed_state|STRING|Supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))| |healthy|LONG|Boolean represented as long type where 1 = true, 0 = false. 1 indicates a healthy supervisor| |type|STRING|Type of supervisor, e.g. `kafka`, `kinesis` or `materialized_view`| |source|STRING|Source of the supervisor, e.g. Kafka topic or Kinesis stream| From 3debca7d0fdf4a6b71bb4012fab8d6d823f57fb2 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 18 Oct 2019 10:58:52 -0700 Subject: [PATCH 11/11] rename fullStatus query param to system and remove it from docs --- docs/operations/api-reference.md | 19 ++----------------- .../supervisor/SupervisorResource.java | 10 +++++----- .../supervisor/SupervisorResourceTest.java | 2 +- .../sql/calcite/schema/SystemSchema.java | 6 +++--- .../sql/calcite/schema/SystemSchemaTest.java | 4 ++-- 5 files changed, 13 insertions(+), 28 deletions(-) diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md index 58a7df4502e6..fe6813d7c49b 100644 --- a/docs/operations/api-reference.md +++ b/docs/operations/api-reference.md @@ -507,8 +507,8 @@ Returns a list of objects of the currently active supervisors. |Field|Type|Description| |---|---|---| |`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`| -|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details)| +|`state`|String|basic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.| +|`detailedState`|String|supervisor specific state. (See documentation of specific supervisor for details), e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))| |`healthy`|Boolean|true or false indicator of overall supervisor health| |`spec`|SupervisorSpec|json specification of supervisor (See Supervisor Configuration for details)| @@ -524,21 +524,6 @@ Returns a list of objects of the currently active supervisors and their current |`healthy`|Boolean|true or false indicator of overall supervisor health| |`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| -* `/druid/indexer/v1/supervisor?fullStatus` - -Returns a list of objects of the currently active supervisors. - -|Field|Type|Description| -|---|---|---| -|`id`|String|supervisor unique identifier| -|`state`|String|basic state of the supervisor. Available states: `UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. Check [Kafka Docs](../development/extensions-core/kafka-ingestion.html#operations) for details.| -|`detailedState`|String|supervisor specific state. (See documentation of the specific supervisor for details, e.g. [Kafka](../development/extensions-core/kafka-ingestion.html) or [Kinesis](../development/extensions-core/kinesis-ingestion.html))| -|`healthy`|Boolean|true or false indicator of overall supervisor health| -|`specString`|String|a JSON string of supervisor spec| -|`type`|String|type of supervisor, e.g. `kafka` or `kinesis`| -|`source`|String|source of supervisor, e.g. Kafka topic or Kinesis stream| -|`suspended`|Boolean|true or false indicator of whether the supervisor is in suspended state| - * `/druid/indexer/v1/supervisor/` Returns the current spec for the supervisor with the provided ID. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 6b9e18d64ea9..74c5b6192cb8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -124,7 +124,7 @@ public Response specPost(final SupervisorSpec spec, @Context final HttpServletRe public Response specGetAll( @QueryParam("full") String full, @QueryParam("state") Boolean state, - @QueryParam("fullStatus") String fullStatus, + @QueryParam("system") String system, @Context final HttpServletRequest req ) { @@ -137,9 +137,9 @@ public Response specGetAll( ); final boolean includeFull = full != null; final boolean includeState = state != null && state; - final boolean includeFullStatus = fullStatus != null; + final boolean includeSystem = system != null; - if (includeFull || includeState || includeFullStatus) { + if (includeFull || includeState || includeSystem) { List allStates = authorizedSupervisorIds .stream() .map(x -> { @@ -158,11 +158,11 @@ public Response specGetAll( theBuilder.withSpec(manager.getSupervisorSpec(x).get()); } } - if (includeFullStatus) { + if (includeSystem) { Optional theSpec = manager.getSupervisorSpec(x); if (theSpec.isPresent()) { try { - // serializing SupervisorSpec here, so that callers of `druid/indexer/v1/supervisor?fullStatus` + // serializing SupervisorSpec here, so that callers of `druid/indexer/v1/supervisor?system` // which are outside the overlord process can deserialize the response and get a json // payload of SupervisorSpec object when they don't have guice bindings for all the fields // for example, broker does not have bindings for all fields of `KafkaSupervisorSpec` or diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index bcddf9a815f4..1390e7acc98c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -216,7 +216,7 @@ public void testSpecGetAllFull() } @Test - public void testSpecGetAllFullStatus() + public void testSpecGetAllSystem() { SupervisorStateManager.State state1 = SupervisorStateManager.BasicState.RUNNING; SupervisorStateManager.State state2 = SupervisorStateManager.BasicState.SUSPENDED; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 79718a4de176..8ee7b7e0a898 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -885,14 +885,14 @@ private CloseableIterator getAuthorizedSupervisors( Function> raGenerator = supervisor -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(supervisor.getSource())); - final Iterable authorizedTasks = AuthorizationUtils.filterAuthorizedResources( + final Iterable authorizedSupervisors = AuthorizationUtils.filterAuthorizedResources( authenticationResult, () -> it, raGenerator, authorizerMapper ); - return wrap(authorizedTasks.iterator(), it); + return wrap(authorizedSupervisors.iterator(), it); } } @@ -908,7 +908,7 @@ private static JsonParserIterator getSupervisors( try { request = indexingServiceClient.makeRequest( HttpMethod.GET, - "/druid/indexer/v1/supervisor?fullStatus", + "/druid/indexer/v1/supervisor?system", false ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 5acbc96dcdf9..ab10c65842aa 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -1127,7 +1127,7 @@ public void testSupervisorTable() throws Exception .withConstructor(client, mapper, responseHandler, authMapper) .createMock(); EasyMock.replay(supervisorTable); - EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?fullStatus", false)) + EasyMock.expect(client.makeRequest(HttpMethod.GET, "/druid/indexer/v1/supervisor?system", false)) .andReturn(request) .anyTimes(); SettableFuture future = SettableFuture.create(); @@ -1135,7 +1135,7 @@ public void testSupervisorTable() throws Exception final int ok = HttpServletResponse.SC_OK; EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes(); EasyMock.expect(request.getUrl()) - .andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?fullStatus")) + .andReturn(new URL("http://test-host:1234/druid/indexer/v1/supervisor?system")) .anyTimes(); AppendableByteArrayInputStream in = new AppendableByteArrayInputStream();