Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,73 @@ Host: http://ROUTER_IP:ROUTER_PORT
```
</details>

### Update supervisor task count

Updates the current task count for a single supervisor.

#### URL

`POST` `/druid/indexer/v1/supervisor/{supervisorId}/updateTaskCount`

#### Responses

<Tabs>

<TabItem value="14" label="200 SUCCESS">


*Successfully updated supervisor task count*

</TabItem>
<TabItem value="15" label="404 NOT FOUND">


*Invalid supervisor ID*

</TabItem>
</Tabs>

---

#### Sample request

The following example shows how to update task count of a supervisor with the name `social_media`.

<Tabs>

<TabItem value="16" label="cURL">


```shell
curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/updateTaskCount"
--header 'Content-Type: application/json'
--data-raw '{"taskCount": 3}'
```

</TabItem>
<TabItem value="17" label="HTTP">


```HTTP
GET /druid/indexer/v1/supervisor/social_media/updateTaskCount HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"id": "social_media"
}
```
</details>

### Get supervisor ingestion stats

Returns a snapshot of the current ingestion row counters for each task being managed by the supervisor, along with moving averages for the row counters. See [Row stats](../ingestion/tasks.md#row-stats) for more information.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,54 +120,7 @@ public SupervisorResource(
@Produces(MediaType.APPLICATION_JSON)
public Response specPost(final SupervisorSpec spec, @Context final HttpServletRequest req)
{
return asLeaderWithSupervisorManager(
manager -> {
Preconditions.checkArgument(
spec.getDataSources() != null && spec.getDataSources().size() > 0,
"No dataSources found to perform authorization checks"
);
final Set<ResourceAction> resourceActions;
try {
resourceActions = getNeededResourceActionsForTask(spec);
}
catch (UOE e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
ImmutableMap.of(
"error",
e.getMessage()
)
)
.build();
}

Access authResult = AuthorizationUtils.authorizeAllResourceActions(
req,
resourceActions,
authorizerMapper
);

if (!authResult.isAllowed()) {
throw new ForbiddenException(authResult.toString());
}

manager.createOrUpdateAndStartSupervisor(spec);

final String auditPayload
= StringUtils.format("Update supervisor[%s] for datasource[%s]", spec.getId(), spec.getDataSources());
auditManager.doAudit(
AuditEntry.builder()
.key(spec.getId())
.type("supervisor")
.auditInfo(AuthorizationUtils.buildAuditInfo(req))
.request(AuthorizationUtils.buildRequestInfo("overlord", req))
.payload(auditPayload)
.build()
);

return Response.ok(ImmutableMap.of("id", spec.getId())).build();
}
);
return asLeaderWithSupervisorManager(manager -> updateSupervisorSpec(spec, manager, req));
}

private Set<ResourceAction> getNeededResourceActionsForTask(final SupervisorSpec spec)
Expand Down Expand Up @@ -560,6 +513,32 @@ public Response reset(@PathParam("id") final String id)
return handleResetRequest(id, null);
}

@POST
@Path("/{id}/updateTaskCount")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response updateTaskCount(
@PathParam("id") final String supervisorId,
@Nonnull SetTaskCountRequest taskCount,
@Context final HttpServletRequest req
)
{
return asLeaderWithSupervisorManager(
manager -> {
Optional<SupervisorSpec> existingSpec = manager.getSupervisorSpec(supervisorId);
if (!existingSpec.isPresent()) {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", supervisorId)))
.build();
}

SupervisorSpec modifiedSpec = existingSpec.get();
modifiedSpec.updateTaskCount(taskCount.getTaskCount());
return updateSupervisorSpec(modifiedSpec, manager, req);
}
);
}

@POST
@Path("/{id}/resetOffsets")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -591,6 +570,54 @@ private Response handleResetRequest(
);
}

private Response updateSupervisorSpec(SupervisorSpec spec, SupervisorManager manager, HttpServletRequest req)
{
Preconditions.checkArgument(
spec.getDataSources() != null && spec.getDataSources().size() > 0,
"No dataSources found to perform authorization checks"
);
final Set<ResourceAction> resourceActions;
try {
resourceActions = getNeededResourceActionsForTask(spec);
}
catch (UOE e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
ImmutableMap.of(
"error",
e.getMessage()
)
)
.build();
}

Access authResult = AuthorizationUtils.authorizeAllResourceActions(
req,
resourceActions,
authorizerMapper
);

if (!authResult.isAllowed()) {
throw new ForbiddenException(authResult.toString());
}

manager.createOrUpdateAndStartSupervisor(spec);

final String auditPayload
= StringUtils.format("Update supervisor[%s] for datasource[%s]", spec.getId(), spec.getDataSources());
auditManager.doAudit(
AuditEntry.builder()
.key(spec.getId())
.type("supervisor")
.auditInfo(AuthorizationUtils.buildAuditInfo(req))
.request(AuthorizationUtils.buildRequestInfo("overlord", req))
.payload(auditPayload)
.build()
);

return Response.ok(ImmutableMap.of("id", spec.getId())).build();
}

private Response asLeaderWithSupervisorManager(Function<SupervisorManager, Response> f)
{
Optional<SupervisorManager> supervisorManager = taskMaster.getSupervisorManager();
Expand Down Expand Up @@ -692,4 +719,21 @@ public List<Integer> getTaskGroupIds()
return taskGroupIds;
}
}

public static class SetTaskCountRequest
{
private final int taskCount;

@JsonCreator
public SetTaskCountRequest(@JsonProperty("taskCount") int taskCount)
{
this.taskCount = taskCount;
}

@JsonProperty
public int getTaskCount()
{
return taskCount;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,10 @@ public boolean isSuspended()

protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend);

@Override
public void updateTaskCount(int taskCount)
{
SeekableStreamSupervisorIOConfig config = this.getIoConfig();
config.setTaskCount(taskCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,53 @@ public void testReset()
verifyAll();
}

@Test
public void testSetTaskCount()
{
String id = "my-id";
SupervisorSpec spec = new TestSupervisorSpec(id, null, null)
{

@Override
public List<String> getDataSources()
{
return Collections.singletonList("datasource1");
}

@Override
public void updateTaskCount(int taskCount)
{
}
};

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(spec)).andReturn(true);
EasyMock.expect(supervisorManager.getSupervisorSpec(id)).andReturn(Optional.of(spec));
setupMockRequest();
setupMockRequestForAudit();
EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true);
auditManager.doAudit(EasyMock.anyObject());
EasyMock.expectLastCall().once();
replayAll();
Response response = supervisorResource.updateTaskCount(
"my-id",
new SupervisorResource.SetTaskCountRequest(2),
request
);
verifyAll();

Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());
resetAll();

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();

response = supervisorResource.specPost(spec, request);
verifyAll();
Assert.assertEquals(503, response.getStatus());
}

@Test
public void testResetOffsets()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,62 @@ public void testGetContextVauleForKeyShouldReturnValue()
Assert.assertEquals("value", spec.getContextValue("key"));
}

@Test
public void testSetTaskCount()
{
int newCount = 5;
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.replay(ingestionSchema);
spec = new SeekableStreamSupervisorSpec(
ingestionSchema,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
supervisorStateManagerConfig
)
{
@Override
public Supervisor createSupervisor()
{
return null;
}

@Override
protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
{
return null;
}

@Override
public String getType()
{
return null;
}

@Override
public String getSource()
{
return null;
}
};


seekableStreamSupervisorIOConfig.setTaskCount(newCount);
EasyMock.expectLastCall().once();
EasyMock.replay(seekableStreamSupervisorIOConfig);

spec.updateTaskCount(newCount);
EasyMock.verify(seekableStreamSupervisorIOConfig);
}

private void mockIngestionSchema()
{
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,12 @@ default Set<ResourceAction> getInputSourceResources() throws UnsupportedOperatio
* @return source like stream or topic name
*/
String getSource();

default void updateTaskCount(int taskCount)
Comment thread
ac9817 marked this conversation as resolved.
{
throw new UOE(StringUtils.format(
"SuperviserSpec type [%s], does not support setTaskCount action",
getType()
));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,14 @@ public void testNoppSupervisorStopTaskEarlyDoNothing()
() -> noOpSupervisor.handoffTaskGroupsEarly(ImmutableList.of())
);
}

@Test
public void testNoopSupervisorChangeTaskCountThrows()
{
NoopSupervisorSpec spec = new NoopSupervisorSpec(null, null);
Assert.assertThrows(
UnsupportedOperationException.class,
() -> spec.updateTaskCount(2)
);
}
}