diff --git a/docs/api-reference/supervisor-api.md b/docs/api-reference/supervisor-api.md
index 16afdd924f0b..41ed57b7f2a3 100644
--- a/docs/api-reference/supervisor-api.md
+++ b/docs/api-reference/supervisor-api.md
@@ -1359,6 +1359,73 @@ Host: http://ROUTER_IP:ROUTER_PORT
```
+### Update supervisor task count
+
+Updates the current task count for a single supervisor.
+
+#### URL
+
+`POST` `/druid/indexer/v1/supervisor/{supervisorId}/updateTaskCount`
+
+#### Responses
+
+
+
+
+
+
+*Successfully updated supervisor task count*
+
+
+
+
+
+*Invalid supervisor ID*
+
+
+
+
+---
+
+#### Sample request
+
+The following example shows how to update task count of a supervisor with the name `social_media`.
+
+
+
+
+
+
+```shell
+curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/updateTaskCount"
+--header 'Content-Type: application/json'
+--data-raw '{"taskCount": 3}'
+```
+
+
+
+
+
+```HTTP
+GET /druid/indexer/v1/supervisor/social_media/updateTaskCount HTTP/1.1
+Host: http://ROUTER_IP:ROUTER_PORT
+```
+
+
+
+
+#### Sample response
+
+
+ View the response
+
+ ```json
+ {
+ "id": "social_media"
+ }
+ ```
+
+
### Get supervisor ingestion stats
Returns a snapshot of the current ingestion row counters for each task being managed by the supervisor, along with moving averages for the row counters. See [Row stats](../ingestion/tasks.md#row-stats) for more information.
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
index 0cf58d385122..e7103fe24ba5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java
@@ -120,54 +120,7 @@ public SupervisorResource(
@Produces(MediaType.APPLICATION_JSON)
public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req)
{
- return asLeaderWithSupervisorManager(
- manager -> {
- Preconditions.checkArgument(
- spec.getDataSources() != null && spec.getDataSources().size() > 0,
- "No dataSources found to perform authorization checks"
- );
- final Set resourceActions;
- try {
- resourceActions = getNeededResourceActionsForTask(spec);
- }
- catch (UOE e) {
- return Response.status(Response.Status.BAD_REQUEST)
- .entity(
- ImmutableMap.of(
- "error",
- e.getMessage()
- )
- )
- .build();
- }
-
- Access authResult = AuthorizationUtils.authorizeAllResourceActions(
- req,
- resourceActions,
- authorizerMapper
- );
-
- if (!authResult.isAllowed()) {
- throw new ForbiddenException(authResult.toString());
- }
-
- manager.createOrUpdateAndStartSupervisor(spec);
-
- final String auditPayload
- = StringUtils.format("Update supervisor[%s] for datasource[%s]", spec.getId(), spec.getDataSources());
- auditManager.doAudit(
- AuditEntry.builder()
- .key(spec.getId())
- .type("supervisor")
- .auditInfo(AuthorizationUtils.buildAuditInfo(req))
- .request(AuthorizationUtils.buildRequestInfo("overlord", req))
- .payload(auditPayload)
- .build()
- );
-
- return Response.ok(ImmutableMap.of("id", spec.getId())).build();
- }
- );
+ return asLeaderWithSupervisorManager(manager -> updateSupervisorSpec(spec, manager, req));
}
private Set getNeededResourceActionsForTask(final SupervisorSpec spec)
@@ -560,6 +513,32 @@ public Response reset(@PathParam("id") final String id)
return handleResetRequest(id, null);
}
+ @POST
+ @Path("/{id}/updateTaskCount")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ResourceFilters(SupervisorResourceFilter.class)
+ public Response updateTaskCount(
+ @PathParam("id") final String supervisorId,
+ @Nonnull SetTaskCountRequest taskCount,
+ @Context final HttpServletRequest req
+ )
+ {
+ return asLeaderWithSupervisorManager(
+ manager -> {
+ Optional existingSpec = manager.getSupervisorSpec(supervisorId);
+ if (!existingSpec.isPresent()) {
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", supervisorId)))
+ .build();
+ }
+
+ SupervisorSpec modifiedSpec = existingSpec.get();
+ modifiedSpec.updateTaskCount(taskCount.getTaskCount());
+ return updateSupervisorSpec(modifiedSpec, manager, req);
+ }
+ );
+ }
+
@POST
@Path("/{id}/resetOffsets")
@Produces(MediaType.APPLICATION_JSON)
@@ -591,6 +570,54 @@ private Response handleResetRequest(
);
}
+ private Response updateSupervisorSpec(SupervisorSpec spec, SupervisorManager manager, HttpServletRequest req)
+ {
+ Preconditions.checkArgument(
+ spec.getDataSources() != null && spec.getDataSources().size() > 0,
+ "No dataSources found to perform authorization checks"
+ );
+ final Set resourceActions;
+ try {
+ resourceActions = getNeededResourceActionsForTask(spec);
+ }
+ catch (UOE e) {
+ return Response.status(Response.Status.BAD_REQUEST)
+ .entity(
+ ImmutableMap.of(
+ "error",
+ e.getMessage()
+ )
+ )
+ .build();
+ }
+
+ Access authResult = AuthorizationUtils.authorizeAllResourceActions(
+ req,
+ resourceActions,
+ authorizerMapper
+ );
+
+ if (!authResult.isAllowed()) {
+ throw new ForbiddenException(authResult.toString());
+ }
+
+ manager.createOrUpdateAndStartSupervisor(spec);
+
+ final String auditPayload
+ = StringUtils.format("Update supervisor[%s] for datasource[%s]", spec.getId(), spec.getDataSources());
+ auditManager.doAudit(
+ AuditEntry.builder()
+ .key(spec.getId())
+ .type("supervisor")
+ .auditInfo(AuthorizationUtils.buildAuditInfo(req))
+ .request(AuthorizationUtils.buildRequestInfo("overlord", req))
+ .payload(auditPayload)
+ .build()
+ );
+
+ return Response.ok(ImmutableMap.of("id", spec.getId())).build();
+ }
+
private Response asLeaderWithSupervisorManager(Function f)
{
Optional supervisorManager = taskMaster.getSupervisorManager();
@@ -692,4 +719,21 @@ public List getTaskGroupIds()
return taskGroupIds;
}
}
+
+ public static class SetTaskCountRequest
+ {
+ private final int taskCount;
+
+ @JsonCreator
+ public SetTaskCountRequest(@JsonProperty("taskCount") int taskCount)
+ {
+ this.taskCount = taskCount;
+ }
+
+ @JsonProperty
+ public int getTaskCount()
+ {
+ return taskCount;
+ }
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
index 7b5f46195e7b..ee29413490ef 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
@@ -204,4 +204,10 @@ public boolean isSuspended()
protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend);
+ @Override
+ public void updateTaskCount(int taskCount)
+ {
+ SeekableStreamSupervisorIOConfig config = this.getIoConfig();
+ config.setTaskCount(taskCount);
+ }
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
index 1fd7af69e123..543581640aec 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java
@@ -1120,6 +1120,53 @@ public void testReset()
verifyAll();
}
+ @Test
+ public void testSetTaskCount()
+ {
+ String id = "my-id";
+ SupervisorSpec spec = new TestSupervisorSpec(id, null, null)
+ {
+
+ @Override
+ public List getDataSources()
+ {
+ return Collections.singletonList("datasource1");
+ }
+
+ @Override
+ public void updateTaskCount(int taskCount)
+ {
+ }
+ };
+
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
+ EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(spec)).andReturn(true);
+ EasyMock.expect(supervisorManager.getSupervisorSpec(id)).andReturn(Optional.of(spec));
+ setupMockRequest();
+ setupMockRequestForAudit();
+ EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
+ auditManager.doAudit(EasyMock.anyObject());
+ EasyMock.expectLastCall().once();
+ replayAll();
+ Response response = supervisorResource.updateTaskCount(
+ "my-id",
+ new SupervisorResource.SetTaskCountRequest(2),
+ request
+ );
+ verifyAll();
+
+ Assert.assertEquals(200, response.getStatus());
+ Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
+ resetAll();
+
+ EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
+ replayAll();
+
+ response = supervisorResource.specPost(spec, request);
+ verifyAll();
+ Assert.assertEquals(503, response.getStatus());
+ }
+
@Test
public void testResetOffsets()
{
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
index 3e0e46d7a033..4ec41c202677 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java
@@ -1246,6 +1246,62 @@ public void testGetContextVauleForKeyShouldReturnValue()
Assert.assertEquals("value", spec.getContextValue("key"));
}
+ @Test
+ public void testSetTaskCount()
+ {
+ int newCount = 5;
+ EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
+ EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
+ EasyMock.replay(ingestionSchema);
+ spec = new SeekableStreamSupervisorSpec(
+ ingestionSchema,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ supervisorStateManagerConfig
+ )
+ {
+ @Override
+ public Supervisor createSupervisor()
+ {
+ return null;
+ }
+
+ @Override
+ protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
+ {
+ return null;
+ }
+
+ @Override
+ public String getType()
+ {
+ return null;
+ }
+
+ @Override
+ public String getSource()
+ {
+ return null;
+ }
+ };
+
+
+ seekableStreamSupervisorIOConfig.setTaskCount(newCount);
+ EasyMock.expectLastCall().once();
+ EasyMock.replay(seekableStreamSupervisorIOConfig);
+
+ spec.updateTaskCount(newCount);
+ EasyMock.verify(seekableStreamSupervisorIOConfig);
+ }
+
private void mockIngestionSchema()
{
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
index 70e5fbd534ed..5615427edd5c 100644
--- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
+++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java
@@ -100,4 +100,12 @@ default Set getInputSourceResources() throws UnsupportedOperatio
* @return source like stream or topic name
*/
String getSource();
+
+ default void updateTaskCount(int taskCount)
+ {
+ throw new UOE(StringUtils.format(
+ "SuperviserSpec type [%s], does not support setTaskCount action",
+ getType()
+ ));
+ }
}
diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
index 8bd4de3b8898..822b327ac283 100644
--- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
+++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java
@@ -100,4 +100,14 @@ public void testNoppSupervisorStopTaskEarlyDoNothing()
() -> noOpSupervisor.handoffTaskGroupsEarly(ImmutableList.of())
);
}
+
+ @Test
+ public void testNoopSupervisorChangeTaskCountThrows()
+ {
+ NoopSupervisorSpec spec = new NoopSupervisorSpec(null, null);
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> spec.updateTaskCount(2)
+ );
+ }
}