From 45a4449ad98d01413a5b764ad96a36569a9d925a Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Thu, 30 Aug 2018 20:47:01 +0800 Subject: [PATCH 1/7] ability to showdown all supervisors --- .../supervisor/SupervisorResource.java | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 9be9c9fb3b57..2ffffad03c44 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -245,6 +245,39 @@ public Response apply(SupervisorManager manager) ); } + @POST + @Path("/shutdownAll") + @Produces(MediaType.APPLICATION_JSON) + public Response shutdownAll() + { + return asLeaderWithSupervisorManager( + new Function() + { + @Override + public Response apply(SupervisorManager manager) + { + Set supervisorIds = manager.getSupervisorIds(); + List successes = new ArrayList<>(); + List errors = new ArrayList<>(); + + for (String id : supervisorIds) { + if (manager.stopAndRemoveSupervisor(id)) { + successes.add(id); + } else { + errors.add(id); + } + } + return Response.ok(ImmutableMap.of( + "success", + String.join(",", successes), + "error", + StringUtils.format("[%s] do not exist", String.join(",", errors)) + )).build(); + } + } + ); + } + @GET @Path("/history") @Produces(MediaType.APPLICATION_JSON) From 6b0f9c2d7bb582a29b5896306293cea2b5c0a066 Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Fri, 31 Aug 2018 02:19:12 +0800 Subject: [PATCH 2/7] add doc --- .../development/extensions-core/kafka-ingestion.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index a7db8d871070..38bf7be7bd35 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -193,12 +193,18 @@ existing publishing tasks and will create new tasks starting at the offsets the Seamless schema migrations can thus be achieved by simply submitting the new schema using this endpoint. -#### Shutdown Supervisor +#### Shutdown Specified Supervisor ``` POST /druid/indexer/v1/supervisor//shutdown ``` Note that this will cause all indexing tasks managed by this supervisor to immediately stop and begin publishing their segments. +#### Shutdown All Supervisor +``` +POST /druid/indexer/v1/supervisor/shutdownAll +``` +Shutdown all supervisors at a time. + #### Get Supervisor IDs ``` GET /druid/indexer/v1/supervisor From 36db5b09a8c0b2c0c4e57d4145b94fe07055970a Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Thu, 20 Sep 2018 23:34:08 +0800 Subject: [PATCH 3/7] address comments --- .../extensions-core/kafka-ingestion.md | 28 ++++-- .../supervisor/SupervisorResource.java | 77 +++++++++++----- .../supervisor/SupervisorResourceTest.java | 88 +++++++++++++++++-- 3 files changed, 155 insertions(+), 38 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index c3395df8ce8c..c3cb9cccc08e 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -203,13 +203,27 @@ Suspend indexing tasks associated with a supervisor. Note that the supervisor it operating and emitting logs and metrics, it will just ensure that no indexing tasks are running until the supervisor is resumed. Responds with updated SupervisorSpec. -#### Resume Supervisor +#### Suspend All Supervisors + +``` +POST /druid/indexer/v1/supervisor/suspendAll +``` +Suspend all supervisors at a time. + +#### Resume Supervisor ``` POST /druid/indexer/v1/supervisor//resume ``` Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec. +#### Resume All Supervisors + +``` +POST /druid/indexer/v1/supervisor/resumeAll +``` +Resume all supervisors at a time. + #### Reset Supervisor ``` POST /druid/indexer/v1/supervisor//reset @@ -238,17 +252,17 @@ with the supervisor history api, but will not be listed in the 'get supervisors' or status report be retrieved. The only way this supervisor can start again is by submitting a functioning supervisor spec to the create api. -#### Shutdown Supervisor -_Deprecated: use the equivalent 'terminate' instead_ +#### Terminate All Supervisors ``` -POST /druid/indexer/v1/supervisor//shutdown +POST /druid/indexer/v1/supervisor/terminateAll ``` +Terminate all supervisors at a time. -#### Shutdown All Supervisor +#### Shutdown Supervisor +_Deprecated: use the equivalent 'terminate' instead_ ``` -POST /druid/indexer/v1/supervisor/shutdownAll +POST /druid/indexer/v1/supervisor//shutdown ``` -Shutdown all supervisors at a time. #### Get Supervisor IDs ``` diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index c463d6048171..c47696d3714f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -261,34 +261,35 @@ public Response terminate(@PathParam("id") final String id) } @POST - @Path("/shutdownAll") + @Path("/suspendAll") @Produces(MediaType.APPLICATION_JSON) - public Response shutdownAll() + public Response specSuspendAll() + { + return specSuspendAllOrResumeAll(true); + } + + @POST + @Path("/resumeAll") + @Produces(MediaType.APPLICATION_JSON) + public Response specResumeAll() + { + return specSuspendAllOrResumeAll(false); + } + + @POST + @Path("/terminateAll") + @Produces(MediaType.APPLICATION_JSON) + public Response terminateAll() { return asLeaderWithSupervisorManager( - new Function() + manager -> { - @Override - public Response apply(SupervisorManager manager) - { - Set supervisorIds = manager.getSupervisorIds(); - List successes = new ArrayList<>(); - List errors = new ArrayList<>(); - - for (String id : supervisorIds) { - if (manager.stopAndRemoveSupervisor(id)) { - successes.add(id); - } else { - errors.add(id); - } - } - return Response.ok(ImmutableMap.of( - "success", - String.join(",", successes), - "error", - StringUtils.format("[%s] do not exist", String.join(",", errors)) - )).build(); - } + Set supervisorIds = manager.getSupervisorIds(); + supervisorIds.forEach(manager::stopAndRemoveSupervisor); + return Response.ok(ImmutableMap.of( + "ids", + String.join(",", supervisorIds) + )).build(); } ); } @@ -431,4 +432,32 @@ private Response specSuspendOrResume(final String id, boolean suspend) } ); } + + private Response specSuspendAllOrResumeAll(boolean suspend) + { + return asLeaderWithSupervisorManager( + manager -> + { + Set supervisorIds = manager.getSupervisorIds(); + List successes = new ArrayList<>(); + List errors = new ArrayList<>(); + + supervisorIds.forEach(id -> { + Optional spec = manager.getSupervisorSpec(id); + if (spec.get().isSuspended() == suspend) { + errors.add(id); + } else { + manager.suspendOrResumeSupervisor(id, suspend); + successes.add(id); + } + }); + return Response.ok(ImmutableMap.of( + "success", + String.join(",", successes), + "error", + StringUtils.format("[%s] are already %s", String.join(",", errors), suspend ? "suspended" : "running") + )).build(); + } + ); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 9d6eab33e1e7..2a5cb74e5ef9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -336,8 +336,6 @@ public List getDataSources() Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already suspended"), response.getEntity()); } - - @Test public void testSpecResume() { @@ -385,19 +383,19 @@ public List getDataSources() } @Test - public void testShutdown() + public void testTerminate() { EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2); EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id")).andReturn(true); EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id-2")).andReturn(false); replayAll(); - Response response = supervisorResource.shutdown("my-id"); + Response response = supervisorResource.terminate("my-id"); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity()); - response = supervisorResource.shutdown("my-id-2"); + response = supervisorResource.terminate("my-id-2"); Assert.assertEquals(404, response.getStatus()); verifyAll(); @@ -407,12 +405,88 @@ public void testShutdown() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.shutdown("my-id"); + response = supervisorResource.terminate("my-id"); verifyAll(); Assert.assertEquals(503, response.getStatus()); } + @Test + public void testSpecSuspendAll() + { + TestSupervisorSpec running = new TestSupervisorSpec("my-id1", null, null, false) { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + TestSupervisorSpec suspended = new TestSupervisorSpec("my-id2", null, null, true) { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource2"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id1", "my-id2")); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id1")).andReturn(Optional.of(running)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id2")).andReturn(Optional.of(suspended)); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id1", true)).andReturn(true); + replayAll(); + + Response response = supervisorResource.specSuspendAll(); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("success", "my-id1", "error", "[my-id2] are already suspended"), response.getEntity()); + verifyAll(); + } + + @Test + public void testSpecResumeAll() + { + TestSupervisorSpec running = new TestSupervisorSpec("my-id1", null, null, false) { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + TestSupervisorSpec suspended = new TestSupervisorSpec("my-id2", null, null, true) { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource2"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id1", "my-id2")); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id1")).andReturn(Optional.of(running)); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id2")).andReturn(Optional.of(suspended)); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id2", false)).andReturn(true); + replayAll(); + + Response response = supervisorResource.specResumeAll(); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("success", "my-id2", "error", "[my-id1] are already running"), response.getEntity()); + verifyAll(); + } + + @Test + public void testTerminateAll() + { + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id1", "my-id2")); + EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id1")).andReturn(true); + EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id2")).andReturn(true); + replayAll(); + + Response response = supervisorResource.terminateAll(); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("ids", "my-id1,my-id2"), response.getEntity()); + } + @Test public void testSpecGetAllHistory() { @@ -872,7 +946,7 @@ public void testReset() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); replayAll(); - response = supervisorResource.shutdown("my-id"); + response = supervisorResource.terminate("my-id"); Assert.assertEquals(503, response.getStatus()); verifyAll(); From 19c45dbafc9d1a57d853c144eb15bbab7a2d74ed Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Fri, 21 Sep 2018 01:24:24 +0800 Subject: [PATCH 4/7] fix code style --- .../indexing/overlord/supervisor/SupervisorResource.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index c47696d3714f..8ca58a1f02a7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -282,8 +282,7 @@ public Response specResumeAll() public Response terminateAll() { return asLeaderWithSupervisorManager( - manager -> - { + manager -> { Set supervisorIds = manager.getSupervisorIds(); supervisorIds.forEach(manager::stopAndRemoveSupervisor); return Response.ok(ImmutableMap.of( @@ -436,8 +435,7 @@ private Response specSuspendOrResume(final String id, boolean suspend) private Response specSuspendAllOrResumeAll(boolean suspend) { return asLeaderWithSupervisorManager( - manager -> - { + manager -> { Set supervisorIds = manager.getSupervisorIds(); List successes = new ArrayList<>(); List errors = new ArrayList<>(); From bdd83a01fab385e2996d77969923ca569188905b Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Wed, 26 Sep 2018 22:54:52 +0800 Subject: [PATCH 5/7] address comments --- .../supervisor/SupervisorManager.java | 53 +++++++++-- .../supervisor/SupervisorResource.java | 65 +++++-------- .../supervisor/SupervisorResourceTest.java | 93 ++++--------------- 3 files changed, 87 insertions(+), 124 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 22fd82746a70..cfca0de4b602 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -94,13 +94,31 @@ public boolean stopAndRemoveSupervisor(String id) public boolean suspendOrResumeSupervisor(String id, boolean suspend) { Preconditions.checkState(started, "SupervisorManager not started"); - Pair pair = supervisors.get(id); - Preconditions.checkNotNull(pair.rhs, "spec"); + Preconditions.checkNotNull(id, "id"); + + synchronized (lock) { + Preconditions.checkState(started, "SupervisorManager not started"); + return possiblySuspendOrResumeSupervisorInternal(id, suspend); + } + } + + public void stopAndRemoveAllSupervisors() + { + Preconditions.checkState(started, "SupervisorManager not started"); + synchronized (lock) { Preconditions.checkState(started, "SupervisorManager not started"); - SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec(); - possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false); - return createAndStartSupervisorInternal(nextState, true); + supervisors.keySet().forEach(id -> possiblyStopAndRemoveSupervisorInternal(id, true)); + } + } + + public void suspendOrResumeAllSupervisors(boolean suspend) + { + Preconditions.checkState(started, "SupervisorManager not started"); + + synchronized (lock) { + Preconditions.checkState(started, "SupervisorManager not started"); + supervisors.keySet().forEach(id -> possiblySuspendOrResumeSupervisorInternal(id, suspend)); } } @@ -206,7 +224,7 @@ public boolean checkPointDataSourceMetadata( * Stops a supervisor with a given id and then removes it from the list. *

* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be - * starting and stopping supervisors. + * starting, stopping, suspending and resuming supervisors. * * @return true if a supervisor was stopped, false if there was no supervisor with this id */ @@ -226,11 +244,32 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write return true; } + /** + * Suspend or resume a supervisor with a given id. + *

+ * Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be + * starting, stopping, suspending and resuming supervisors. + * + * @return true if a supervisor was suspended or resumed, false if there was no supervisor with this id + * or suspend a suspended supervisor or resume a running supervisor + */ + private boolean possiblySuspendOrResumeSupervisorInternal(String id, boolean suspend) + { + Pair pair = supervisors.get(id); + if (pair == null || pair.rhs.isSuspended() == suspend) { + return false; + } + + SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec(); + possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false); + return createAndStartSupervisorInternal(nextState, true); + } + /** * Creates a supervisor from the provided spec and starts it if there is not already a supervisor with that id. *

* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be - * starting and stopping supervisors. + * starting, stopping, suspending and resuming supervisors. * * @return true if a new supervisor was created, false if there was already an existing supervisor with this id */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 8ca58a1f02a7..5f1c644da03b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -263,17 +263,17 @@ public Response terminate(@PathParam("id") final String id) @POST @Path("/suspendAll") @Produces(MediaType.APPLICATION_JSON) - public Response specSuspendAll() + public Response suspendAll() { - return specSuspendAllOrResumeAll(true); + return suspendOrResumeAll(true); } @POST @Path("/resumeAll") @Produces(MediaType.APPLICATION_JSON) - public Response specResumeAll() + public Response resumeAll() { - return specSuspendAllOrResumeAll(false); + return suspendOrResumeAll(false); } @POST @@ -283,12 +283,8 @@ public Response terminateAll() { return asLeaderWithSupervisorManager( manager -> { - Set supervisorIds = manager.getSupervisorIds(); - supervisorIds.forEach(manager::stopAndRemoveSupervisor); - return Response.ok(ImmutableMap.of( - "ids", - String.join(",", supervisorIds) - )).build(); + manager.stopAndRemoveAllSupervisors(); + return Response.ok(ImmutableMap.of("status", "success")).build(); } ); } @@ -411,50 +407,31 @@ private Response specSuspendOrResume(final String id, boolean suspend) { return asLeaderWithSupervisorManager( manager -> { - Optional spec = manager.getSupervisorSpec(id); - if (!spec.isPresent()) { - return Response.status(Response.Status.NOT_FOUND) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id))) - .build(); - } - - if (spec.get().isSuspended() == suspend) { - final String errMsg = - StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running"); - return Response.status(Response.Status.BAD_REQUEST) + if (manager.suspendOrResumeSupervisor(id, suspend)) { + Optional spec = manager.getSupervisorSpec(id); + return Response.ok(spec.get()).build(); + } else { + Optional spec = manager.getSupervisorSpec(id); + final Response.Status status = spec.isPresent() ? Response.Status.BAD_REQUEST : Response.Status.NOT_FOUND; + final String errMsg = spec.isPresent() ? StringUtils.format( + "[%s] is already %s", + id, + suspend ? "suspended" : "running" + ) : StringUtils.format("[%s] does not exist", id); + return Response.status(status) .entity(ImmutableMap.of("error", errMsg)) .build(); } - manager.suspendOrResumeSupervisor(id, suspend); - spec = manager.getSupervisorSpec(id); - return Response.ok(spec.get()).build(); } ); } - private Response specSuspendAllOrResumeAll(boolean suspend) + private Response suspendOrResumeAll(boolean suspend) { return asLeaderWithSupervisorManager( manager -> { - Set supervisorIds = manager.getSupervisorIds(); - List successes = new ArrayList<>(); - List errors = new ArrayList<>(); - - supervisorIds.forEach(id -> { - Optional spec = manager.getSupervisorSpec(id); - if (spec.get().isSuspended() == suspend) { - errors.add(id); - } else { - manager.suspendOrResumeSupervisor(id, suspend); - successes.add(id); - } - }); - return Response.ok(ImmutableMap.of( - "success", - String.join(",", successes), - "error", - StringUtils.format("[%s] are already %s", String.join(",", errors), suspend ? "suspended" : "running") - )).build(); + manager.suspendOrResumeAllSupervisors(suspend); + return Response.ok(ImmutableMap.of("status", "success")).build(); } ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 2a5cb74e5ef9..d893898c5a17 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -292,14 +292,6 @@ public void testSpecGetStatus() @Test public void testSpecSuspend() { - - TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) { @Override public List getDataSources() @@ -309,11 +301,8 @@ public List getDataSources() }; EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(running)).times(1) - .andReturn(Optional.of(suspended)).times(1); EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(true); - EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)); replayAll(); Response response = supervisorResource.specSuspend("my-id"); @@ -326,7 +315,8 @@ public List getDataSources() resetAll(); EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce(); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(false); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)); replayAll(); response = supervisorResource.specSuspend("my-id"); @@ -339,13 +329,6 @@ public List getDataSources() @Test public void testSpecResume() { - TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) { @Override public List getDataSources() @@ -355,11 +338,8 @@ public List getDataSources() }; EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(suspended)).times(1) - .andReturn(Optional.of(running)).times(1); EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(true); - EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)); replayAll(); Response response = supervisorResource.specResume("my-id"); @@ -372,7 +352,8 @@ public List getDataSources() resetAll(); EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce(); + EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(false); + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)); replayAll(); response = supervisorResource.specResume("my-id"); @@ -412,64 +393,30 @@ public void testTerminate() } @Test - public void testSpecSuspendAll() + public void testSuspendAll() { - TestSupervisorSpec running = new TestSupervisorSpec("my-id1", null, null, false) { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; - TestSupervisorSpec suspended = new TestSupervisorSpec("my-id2", null, null, true) { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource2"); - } - }; - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id1", "my-id2")); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id1")).andReturn(Optional.of(running)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id2")).andReturn(Optional.of(suspended)); - EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id1", true)).andReturn(true); + supervisorManager.suspendOrResumeAllSupervisors(true); + EasyMock.expectLastCall(); replayAll(); - Response response = supervisorResource.specSuspendAll(); + Response response = supervisorResource.suspendAll(); Assert.assertEquals(200, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("success", "my-id1", "error", "[my-id2] are already suspended"), response.getEntity()); + Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity()); verifyAll(); } @Test - public void testSpecResumeAll() + public void testResumeAll() { - TestSupervisorSpec running = new TestSupervisorSpec("my-id1", null, null, false) { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; - TestSupervisorSpec suspended = new TestSupervisorSpec("my-id2", null, null, true) { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource2"); - } - }; - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id1", "my-id2")); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id1")).andReturn(Optional.of(running)); - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id2")).andReturn(Optional.of(suspended)); - EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id2", false)).andReturn(true); + supervisorManager.suspendOrResumeAllSupervisors(false); + EasyMock.expectLastCall(); replayAll(); - Response response = supervisorResource.specResumeAll(); + Response response = supervisorResource.resumeAll(); Assert.assertEquals(200, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("success", "my-id2", "error", "[my-id1] are already running"), response.getEntity()); + Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity()); verifyAll(); } @@ -477,14 +424,14 @@ public List getDataSources() public void testTerminateAll() { EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - EasyMock.expect(supervisorManager.getSupervisorIds()).andReturn(ImmutableSet.of("my-id1", "my-id2")); - EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id1")).andReturn(true); - EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id2")).andReturn(true); + supervisorManager.stopAndRemoveAllSupervisors(); + EasyMock.expectLastCall(); replayAll(); Response response = supervisorResource.terminateAll(); Assert.assertEquals(200, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("ids", "my-id1,my-id2"), response.getEntity()); + Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity()); + verifyAll(); } @Test From 1c8b26a3f07a7e2973e7a6749c474431b5810e85 Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Sat, 29 Sep 2018 15:39:51 +0800 Subject: [PATCH 6/7] change ternary assignment to if statement --- .../overlord/supervisor/SupervisorResource.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 5f1c644da03b..97e0580376e8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -412,12 +412,15 @@ private Response specSuspendOrResume(final String id, boolean suspend) return Response.ok(spec.get()).build(); } else { Optional spec = manager.getSupervisorSpec(id); - final Response.Status status = spec.isPresent() ? Response.Status.BAD_REQUEST : Response.Status.NOT_FOUND; - final String errMsg = spec.isPresent() ? StringUtils.format( - "[%s] is already %s", - id, - suspend ? "suspended" : "running" - ) : StringUtils.format("[%s] does not exist", id); + Response.Status status; + String errMsg; + if (spec.isPresent()) { + status = Response.Status.BAD_REQUEST; + errMsg = StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running"); + } else { + status = Response.Status.NOT_FOUND; + errMsg = StringUtils.format("[%s] does not exist", id); + } return Response.status(status) .entity(ImmutableMap.of("error", errMsg)) .build(); From 5248adc294160aa1158c88a6b83210780bf3b99a Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Thu, 11 Oct 2018 10:31:04 +0800 Subject: [PATCH 7/7] better docs --- docs/content/development/extensions-core/kafka-ingestion.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 7cd22ea61f7b..568fc94fe30d 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -211,7 +211,7 @@ is resumed. Responds with updated SupervisorSpec. ``` POST /druid/indexer/v1/supervisor/suspendAll ``` -Suspend all supervisors at a time. +Suspend all supervisors at once. #### Resume Supervisor @@ -225,7 +225,7 @@ Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec. ``` POST /druid/indexer/v1/supervisor/resumeAll ``` -Resume all supervisors at a time. +Resume all supervisors at once. #### Reset Supervisor ``` @@ -259,7 +259,7 @@ spec to the create api. ``` POST /druid/indexer/v1/supervisor/terminateAll ``` -Terminate all supervisors at a time. +Terminate all supervisors at once. #### Shutdown Supervisor _Deprecated: use the equivalent 'terminate' instead_