From d981e27530b14b82c3860e85a6a809236055c37e Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 25 Jun 2024 10:26:40 -0500 Subject: [PATCH 1/5] Introduce ability to change supervisor task count --- .../supervisor/SupervisorManager.java | 38 +++++++++++++++++++ .../supervisor/SupervisorResource.java | 19 ++++++++++ .../supervisor/SeekableStreamSupervisor.java | 3 +- .../overlord/supervisor/Supervisor.java | 6 +++ 4 files changed, 65 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 1267bb645a64..4ddbe209f98f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -348,6 +348,44 @@ public boolean registerUpgradedPendingSegmentOnSupervisor( return false; } + public boolean scaleSupervisor(String id, Integer taskCount) + { + Preconditions.checkState(started, "SupervisorManager not started"); + Preconditions.checkNotNull(id, "id"); + Preconditions.checkState( + taskCount != null && taskCount >= 0, + "taskCount should be non null positive value" + ); + + synchronized (lock) { + Preconditions.checkState(started, "SupervisorManager not started"); + return scaleSupervisorInternal(id, taskCount); + } + } + + /** + * Scale a supervisor with a given id. + *

+ * Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be + * starting, stopping, suspending and resuming supervisors. + * + * @return true if a supervisor was scaled, false if there was no supervisor with this id. + */ + private boolean scaleSupervisorInternal(String id, Integer taskCount) + { + Pair pair = supervisors.get(id); + if (pair == null) { + return false; + } + + try { + return pair.lhs.changeTaskCount(taskCount); + } + catch (Exception e) { + log.error(e, "Failed to scale supervisor: [%s]", pair.rhs.getId()); + throw new RuntimeException(e); + } + } /** * Stops a supervisor with a given id and then removes it from the list. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 0cf58d385122..bcabc88a51ab 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -560,6 +560,25 @@ public Response reset(@PathParam("id") final String id) return handleResetRequest(id, null); } + @POST + @Path("/{id}/setTaskCount") + @Produces(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response setTaskCount(@PathParam("id") final String id, @QueryParam("taskCount") final Integer taskCount) + { + return asLeaderWithSupervisorManager( + manager -> { + if (manager.scaleSupervisor(id, taskCount)) { + return Response.ok(ImmutableMap.of("id", id)).build(); + } else { + return Response.status(Response.Status.NOT_MODIFIED) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] is not scaled", id))) + .build(); + } + } + ); + } + @POST @Path("/{id}/resetOffsets") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ec4de45cac71..ace2d265505d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -530,7 +530,8 @@ public String getType() * @throws InterruptedException * @throws ExecutionException */ - private boolean changeTaskCount(int desiredActiveTaskCount) + @Override + public boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException { int currentActiveTaskCount; diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index e92b194e3e3e..edd23b7677b0 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -29,6 +29,7 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; public interface Supervisor { @@ -103,4 +104,9 @@ default void handoffTaskGroupsEarly(List taskGroupIds) { throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented"); } + + default boolean changeTaskCount(int taskCount) throws InterruptedException, ExecutionException + { + throw new UnsupportedOperationException(); + } } From a6dd139ba70bf1031f4d532903942b27d148d7ea Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 25 Jun 2024 10:43:14 -0500 Subject: [PATCH 2/5] add-tests --- .../supervisor/SupervisorManager.java | 6 ++--- .../supervisor/SupervisorResource.java | 2 +- .../supervisor/SupervisorManagerTest.java | 21 +++++++++++++++ .../supervisor/SupervisorResourceTest.java | 27 +++++++++++++++++++ 4 files changed, 52 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 4ddbe209f98f..c0394ea255ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -348,7 +348,7 @@ public boolean registerUpgradedPendingSegmentOnSupervisor( return false; } - public boolean scaleSupervisor(String id, Integer taskCount) + public boolean changeTaskCountSupervisor(String id, Integer taskCount) { Preconditions.checkState(started, "SupervisorManager not started"); Preconditions.checkNotNull(id, "id"); @@ -359,7 +359,7 @@ public boolean scaleSupervisor(String id, Integer taskCount) synchronized (lock) { Preconditions.checkState(started, "SupervisorManager not started"); - return scaleSupervisorInternal(id, taskCount); + return changeTaskCountSupervisorInternal(id, taskCount); } } @@ -371,7 +371,7 @@ public boolean scaleSupervisor(String id, Integer taskCount) * * @return true if a supervisor was scaled, false if there was no supervisor with this id. */ - private boolean scaleSupervisorInternal(String id, Integer taskCount) + private boolean changeTaskCountSupervisorInternal(String id, Integer taskCount) { Pair pair = supervisors.get(id); if (pair == null) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index bcabc88a51ab..37f2affb0eb4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -568,7 +568,7 @@ public Response setTaskCount(@PathParam("id") final String id, @QueryParam("task { return asLeaderWithSupervisorManager( manager -> { - if (manager.scaleSupervisor(id, taskCount)) { + if (manager.changeTaskCountSupervisor(id, taskCount)) { return Response.ok(ImmutableMap.of("id", id)).build(); } else { return Response.status(Response.Status.NOT_MODIFIED) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index d3d1045ea714..c646340de675 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -52,6 +52,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; @RunWith(EasyMockRunner.class) public class SupervisorManagerTest extends EasyMockSupport @@ -463,6 +464,26 @@ public void testCreateSuspendResumeAndStopSupervisor() Assert.assertTrue(manager.getSupervisorIds().isEmpty()); } + @Test + public void testChangeTaskCountSupervisor() throws ExecutionException, InterruptedException + { + Map existingSpecs = ImmutableMap.of( + "id1", new TestSupervisorSpec("id1", supervisor1) + ); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + supervisor1.start(); + EasyMock.expect(supervisor1.changeTaskCount(EasyMock.anyInt())).andReturn(true); + replayAll(); + + manager.start(); + + Assert.assertEquals(false, manager.changeTaskCountSupervisor("non-existent-id", 2)); + Assert.assertEquals(true, manager.changeTaskCountSupervisor("id1", 2)); + + verifyAll(); + } + @Test public void testGetActiveSupervisorIdForDatasourceWithAppendLock() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 1fd7af69e123..7292b2079938 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -1120,6 +1120,33 @@ public void testReset() verifyAll(); } + @Test + public void testSetTaskCount() + { + Capture id1 = Capture.newInstance(); + Capture id2 = Capture.newInstance(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2); + EasyMock.expect(supervisorManager.changeTaskCountSupervisor( + EasyMock.capture(id1), + EasyMock.anyObject(Integer.class) + )).andReturn(true); + EasyMock.expect(supervisorManager.changeTaskCountSupervisor( + EasyMock.capture(id2), + EasyMock.anyObject(Integer.class) + )).andReturn(false); + replayAll(); + + Response response = supervisorResource.setTaskCount("my-id", 2); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity()); + + response = supervisorResource.setTaskCount("my-id-2", 2); + Assert.assertEquals(304, response.getStatus()); + Assert.assertEquals("my-id", id1.getValue()); + Assert.assertEquals("my-id-2", id2.getValue()); + verifyAll(); + } + @Test public void testResetOffsets() { From d270ff2c2bf47346eb274ba0e16570baa797b20c Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Tue, 25 Jun 2024 11:36:24 -0500 Subject: [PATCH 3/5] add-coverage --- .../supervisor/SupervisorManager.java | 38 ----- .../supervisor/SupervisorResource.java | 133 +++++++++++------- .../supervisor/SeekableStreamSupervisor.java | 3 +- .../SeekableStreamSupervisorSpec.java | 6 + .../supervisor/SupervisorManagerTest.java | 21 --- .../supervisor/SupervisorResourceTest.java | 52 ++++--- .../SeekableStreamSupervisorSpecTest.java | 56 ++++++++ .../overlord/supervisor/Supervisor.java | 6 - .../overlord/supervisor/SupervisorSpec.java | 8 ++ .../indexing/NoopSupervisorSpecTest.java | 10 ++ 10 files changed, 196 insertions(+), 137 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index c0394ea255ce..1267bb645a64 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -348,44 +348,6 @@ public boolean registerUpgradedPendingSegmentOnSupervisor( return false; } - public boolean changeTaskCountSupervisor(String id, Integer taskCount) - { - Preconditions.checkState(started, "SupervisorManager not started"); - Preconditions.checkNotNull(id, "id"); - Preconditions.checkState( - taskCount != null && taskCount >= 0, - "taskCount should be non null positive value" - ); - - synchronized (lock) { - Preconditions.checkState(started, "SupervisorManager not started"); - return changeTaskCountSupervisorInternal(id, taskCount); - } - } - - /** - * Scale a supervisor with a given id. - *

- * Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be - * starting, stopping, suspending and resuming supervisors. - * - * @return true if a supervisor was scaled, false if there was no supervisor with this id. - */ - private boolean changeTaskCountSupervisorInternal(String id, Integer taskCount) - { - Pair pair = supervisors.get(id); - if (pair == null) { - return false; - } - - try { - return pair.lhs.changeTaskCount(taskCount); - } - catch (Exception e) { - log.error(e, "Failed to scale supervisor: [%s]", pair.rhs.getId()); - throw new RuntimeException(e); - } - } /** * Stops a supervisor with a given id and then removes it from the list. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 37f2affb0eb4..16bbbe81a2af 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -120,54 +120,7 @@ public SupervisorResource( @Produces(MediaType.APPLICATION_JSON) public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req) { - return asLeaderWithSupervisorManager( - manager -> { - Preconditions.checkArgument( - spec.getDataSources() != null && spec.getDataSources().size() > 0, - "No dataSources found to perform authorization checks" - ); - final Set resourceActions; - try { - resourceActions = getNeededResourceActionsForTask(spec); - } - catch (UOE e) { - return Response.status(Response.Status.BAD_REQUEST) - .entity( - ImmutableMap.of( - "error", - e.getMessage() - ) - ) - .build(); - } - - Access authResult = AuthorizationUtils.authorizeAllResourceActions( - req, - resourceActions, - authorizerMapper - ); - - if (!authResult.isAllowed()) { - throw new ForbiddenException(authResult.toString()); - } - - manager.createOrUpdateAndStartSupervisor(spec); - - final String auditPayload - = StringUtils.format("Update supervisor[%s] for datasource[%s]", spec.getId(), spec.getDataSources()); - auditManager.doAudit( - AuditEntry.builder() - .key(spec.getId()) - .type("supervisor") - .auditInfo(AuthorizationUtils.buildAuditInfo(req)) - .request(AuthorizationUtils.buildRequestInfo("overlord", req)) - .payload(auditPayload) - .build() - ); - - return Response.ok(ImmutableMap.of("id", spec.getId())).build(); - } - ); + return asLeaderWithSupervisorManager(manager -> postLogic(spec, manager, req)); } private Set getNeededResourceActionsForTask(final SupervisorSpec spec) @@ -564,17 +517,24 @@ public Response reset(@PathParam("id") final String id) @Path("/{id}/setTaskCount") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(SupervisorResourceFilter.class) - public Response setTaskCount(@PathParam("id") final String id, @QueryParam("taskCount") final Integer taskCount) + public Response setTaskCount( + @PathParam("id") final String supervisorId, + @Nonnull SetTaskCountRequest taskCount, + @Context final HttpServletRequest req + ) { return asLeaderWithSupervisorManager( manager -> { - if (manager.changeTaskCountSupervisor(id, taskCount)) { - return Response.ok(ImmutableMap.of("id", id)).build(); - } else { - return Response.status(Response.Status.NOT_MODIFIED) - .entity(ImmutableMap.of("error", StringUtils.format("[%s] is not scaled", id))) + Optional existingSpec = manager.getSupervisorSpec(supervisorId); + if (!existingSpec.isPresent()) { + return Response.status(Response.Status.NOT_FOUND) + .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", supervisorId))) .build(); } + + SupervisorSpec modifiedSpec = existingSpec.get(); + modifiedSpec.updateTaskCount(taskCount.getTaskCount()); + return postLogic(modifiedSpec, manager, req); } ); } @@ -610,6 +570,54 @@ private Response handleResetRequest( ); } + private Response postLogic(SupervisorSpec spec, SupervisorManager manager, HttpServletRequest req) + { + Preconditions.checkArgument( + spec.getDataSources() != null && spec.getDataSources().size() > 0, + "No dataSources found to perform authorization checks" + ); + final Set resourceActions; + try { + resourceActions = getNeededResourceActionsForTask(spec); + } + catch (UOE e) { + return Response.status(Response.Status.BAD_REQUEST) + .entity( + ImmutableMap.of( + "error", + e.getMessage() + ) + ) + .build(); + } + + Access authResult = AuthorizationUtils.authorizeAllResourceActions( + req, + resourceActions, + authorizerMapper + ); + + if (!authResult.isAllowed()) { + throw new ForbiddenException(authResult.toString()); + } + + manager.createOrUpdateAndStartSupervisor(spec); + + final String auditPayload + = StringUtils.format("Update supervisor[%s] for datasource[%s]", spec.getId(), spec.getDataSources()); + auditManager.doAudit( + AuditEntry.builder() + .key(spec.getId()) + .type("supervisor") + .auditInfo(AuthorizationUtils.buildAuditInfo(req)) + .request(AuthorizationUtils.buildRequestInfo("overlord", req)) + .payload(auditPayload) + .build() + ); + + return Response.ok(ImmutableMap.of("id", spec.getId())).build(); + } + private Response asLeaderWithSupervisorManager(Function f) { Optional supervisorManager = taskMaster.getSupervisorManager(); @@ -711,4 +719,21 @@ public List getTaskGroupIds() return taskGroupIds; } } + + public static class SetTaskCountRequest + { + private final int taskCount; + + @JsonCreator + public SetTaskCountRequest(@JsonProperty("taskCount") int taskCount) + { + this.taskCount = taskCount; + } + + @JsonProperty + public int getTaskCount() + { + return taskCount; + } + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ace2d265505d..ec4de45cac71 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -530,8 +530,7 @@ public String getType() * @throws InterruptedException * @throws ExecutionException */ - @Override - public boolean changeTaskCount(int desiredActiveTaskCount) + private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException { int currentActiveTaskCount; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 7b5f46195e7b..ee29413490ef 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -204,4 +204,10 @@ public boolean isSuspended() protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); + @Override + public void updateTaskCount(int taskCount) + { + SeekableStreamSupervisorIOConfig config = this.getIoConfig(); + config.setTaskCount(taskCount); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index c646340de675..d3d1045ea714 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -52,7 +52,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; @RunWith(EasyMockRunner.class) public class SupervisorManagerTest extends EasyMockSupport @@ -464,26 +463,6 @@ public void testCreateSuspendResumeAndStopSupervisor() Assert.assertTrue(manager.getSupervisorIds().isEmpty()); } - @Test - public void testChangeTaskCountSupervisor() throws ExecutionException, InterruptedException - { - Map existingSpecs = ImmutableMap.of( - "id1", new TestSupervisorSpec("id1", supervisor1) - ); - - EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); - supervisor1.start(); - EasyMock.expect(supervisor1.changeTaskCount(EasyMock.anyInt())).andReturn(true); - replayAll(); - - manager.start(); - - Assert.assertEquals(false, manager.changeTaskCountSupervisor("non-existent-id", 2)); - Assert.assertEquals(true, manager.changeTaskCountSupervisor("id1", 2)); - - verifyAll(); - } - @Test public void testGetActiveSupervisorIdForDatasourceWithAppendLock() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 7292b2079938..f5396e8ab681 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -1123,28 +1123,48 @@ public void testReset() @Test public void testSetTaskCount() { - Capture id1 = Capture.newInstance(); - Capture id2 = Capture.newInstance(); - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2); - EasyMock.expect(supervisorManager.changeTaskCountSupervisor( - EasyMock.capture(id1), - EasyMock.anyObject(Integer.class) - )).andReturn(true); - EasyMock.expect(supervisorManager.changeTaskCountSupervisor( - EasyMock.capture(id2), - EasyMock.anyObject(Integer.class) - )).andReturn(false); + String id = "my-id"; + SupervisorSpec spec = new TestSupervisorSpec(id, null, null) + { + + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + + @Override + public void updateTaskCount(int taskCount) + { + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(spec)).andReturn(true); + EasyMock.expect(supervisorManager.getSupervisorSpec(id)).andReturn(Optional.of(spec)); + setupMockRequest(); + setupMockRequestForAudit(); + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); + auditManager.doAudit(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); replayAll(); + Response response = supervisorResource.setTaskCount( + "my-id", + new SupervisorResource.SetTaskCountRequest(2), + request + ); + verifyAll(); - Response response = supervisorResource.setTaskCount("my-id", 2); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity()); + resetAll(); - response = supervisorResource.setTaskCount("my-id-2", 2); - Assert.assertEquals(304, response.getStatus()); - Assert.assertEquals("my-id", id1.getValue()); - Assert.assertEquals("my-id-2", id2.getValue()); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); + replayAll(); + + response = supervisorResource.specPost(spec, request); verifyAll(); + Assert.assertEquals(503, response.getStatus()); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 3e0e46d7a033..4ec41c202677 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -1246,6 +1246,62 @@ public void testGetContextVauleForKeyShouldReturnValue() Assert.assertEquals("value", spec.getContextValue("key")); } + @Test + public void testSetTaskCount() + { + int newCount = 5; + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.replay(ingestionSchema); + spec = new SeekableStreamSupervisorSpec( + ingestionSchema, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + supervisorStateManagerConfig + ) + { + @Override + public Supervisor createSupervisor() + { + return null; + } + + @Override + protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) + { + return null; + } + + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } + }; + + + seekableStreamSupervisorIOConfig.setTaskCount(newCount); + EasyMock.expectLastCall().once(); + EasyMock.replay(seekableStreamSupervisorIOConfig); + + spec.updateTaskCount(newCount); + EasyMock.verify(seekableStreamSupervisorIOConfig); + } + private void mockIngestionSchema() { EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index edd23b7677b0..e92b194e3e3e 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -29,7 +29,6 @@ import javax.annotation.Nullable; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; public interface Supervisor { @@ -104,9 +103,4 @@ default void handoffTaskGroupsEarly(List taskGroupIds) { throw new UnsupportedOperationException("Supervisor does not have the feature to handoff task groups early implemented"); } - - default boolean changeTaskCount(int taskCount) throws InterruptedException, ExecutionException - { - throw new UnsupportedOperationException(); - } } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 70e5fbd534ed..5615427edd5c 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -100,4 +100,12 @@ default Set getInputSourceResources() throws UnsupportedOperatio * @return source like stream or topic name */ String getSource(); + + default void updateTaskCount(int taskCount) + { + throw new UOE(StringUtils.format( + "SuperviserSpec type [%s], does not support setTaskCount action", + getType() + )); + } } diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java index 8bd4de3b8898..822b327ac283 100644 --- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java +++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java @@ -100,4 +100,14 @@ public void testNoppSupervisorStopTaskEarlyDoNothing() () -> noOpSupervisor.handoffTaskGroupsEarly(ImmutableList.of()) ); } + + @Test + public void testNoopSupervisorChangeTaskCountThrows() + { + NoopSupervisorSpec spec = new NoopSupervisorSpec(null, null); + Assert.assertThrows( + UnsupportedOperationException.class, + () -> spec.updateTaskCount(2) + ); + } } From fb40c8d0121081f41a93008c15a20aaf26c6b085 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 1 Jul 2024 23:06:19 -0500 Subject: [PATCH 4/5] comments --- .../indexing/overlord/supervisor/SupervisorResource.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 16bbbe81a2af..563443a3e9ba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -120,7 +120,7 @@ public SupervisorResource( @Produces(MediaType.APPLICATION_JSON) public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req) { - return asLeaderWithSupervisorManager(manager -> postLogic(spec, manager, req)); + return asLeaderWithSupervisorManager(manager -> updateSupervisorSpec(spec, manager, req)); } private Set getNeededResourceActionsForTask(final SupervisorSpec spec) @@ -534,7 +534,7 @@ public Response setTaskCount( SupervisorSpec modifiedSpec = existingSpec.get(); modifiedSpec.updateTaskCount(taskCount.getTaskCount()); - return postLogic(modifiedSpec, manager, req); + return updateSupervisorSpec(modifiedSpec, manager, req); } ); } @@ -570,7 +570,7 @@ private Response handleResetRequest( ); } - private Response postLogic(SupervisorSpec spec, SupervisorManager manager, HttpServletRequest req) + private Response updateSupervisorSpec(SupervisorSpec spec, SupervisorManager manager, HttpServletRequest req) { Preconditions.checkArgument( spec.getDataSources() != null && spec.getDataSources().size() > 0, From de300dbf4f6bdfce8d3316bde7787d29326e9990 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 1 Jul 2024 23:16:39 -0500 Subject: [PATCH 5/5] minor nits --- docs/api-reference/supervisor-api.md | 67 +++++++++++++++++++ .../supervisor/SupervisorResource.java | 4 +- .../supervisor/SupervisorResourceTest.java | 2 +- 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/docs/api-reference/supervisor-api.md b/docs/api-reference/supervisor-api.md index 16afdd924f0b..41ed57b7f2a3 100644 --- a/docs/api-reference/supervisor-api.md +++ b/docs/api-reference/supervisor-api.md @@ -1359,6 +1359,73 @@ Host: http://ROUTER_IP:ROUTER_PORT ``` +### Update supervisor task count + +Updates the current task count for a single supervisor. + +#### URL + +`POST` `/druid/indexer/v1/supervisor/{supervisorId}/updateTaskCount` + +#### Responses + + + + + + +*Successfully updated supervisor task count* + + + + + +*Invalid supervisor ID* + + + + +--- + +#### Sample request + +The following example shows how to update task count of a supervisor with the name `social_media`. + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/updateTaskCount" +--header 'Content-Type: application/json' +--data-raw '{"taskCount": 3}' +``` + + + + + +```HTTP +GET /druid/indexer/v1/supervisor/social_media/updateTaskCount HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +

+ View the response + + ```json + { + "id": "social_media" + } + ``` +
+ ### Get supervisor ingestion stats Returns a snapshot of the current ingestion row counters for each task being managed by the supervisor, along with moving averages for the row counters. See [Row stats](../ingestion/tasks.md#row-stats) for more information. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index 563443a3e9ba..e7103fe24ba5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -514,10 +514,10 @@ public Response reset(@PathParam("id") final String id) } @POST - @Path("/{id}/setTaskCount") + @Path("/{id}/updateTaskCount") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(SupervisorResourceFilter.class) - public Response setTaskCount( + public Response updateTaskCount( @PathParam("id") final String supervisorId, @Nonnull SetTaskCountRequest taskCount, @Context final HttpServletRequest req diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index f5396e8ab681..543581640aec 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -1148,7 +1148,7 @@ public void updateTaskCount(int taskCount) auditManager.doAudit(EasyMock.anyObject()); EasyMock.expectLastCall().once(); replayAll(); - Response response = supervisorResource.setTaskCount( + Response response = supervisorResource.updateTaskCount( "my-id", new SupervisorResource.SetTaskCountRequest(2), request