Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions docs/content/development/extensions-core/kafka-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,27 @@ Suspend indexing tasks associated with a supervisor. Note that the supervisor it
operating and emitting logs and metrics, it will just ensure that no indexing tasks are running until the supervisor
is resumed. Responds with updated SupervisorSpec.

#### Resume Supervisor
#### Suspend All Supervisors

```
POST /druid/indexer/v1/supervisor/suspendAll
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I still aesthetically prefer /druid/indexer/v1/supervisor/suspend, etc. over /druid/indexer/v1/supervisor/suspendAll, etc, but that's just a matter of personal preference, up to you whether or not to change it 😄

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine for me if change to /druid/indexer/v1/supervisors/suspend , but just as you said before, that seems more painful than useful to change. So I keep this All suffix. Besides, in PR #6185 All is also be used in endpoint path.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I wasn't suggesting supervisors with the s since yeah that's a more dramatic change, just leaving out the All, but looking around at the rest of the APIs some more, and things already aren't terribly consistent with things like pluralization, etc., anyway so I think basically whatever is probably fine as long as there are docs and stuff does what is intended 😜

```
Suspend all supervisors at once.

#### Resume Supervisor

```
POST /druid/indexer/v1/supervisor/<supervisorId>/resume
```
Resume indexing tasks for a supervisor. Responds with updated SupervisorSpec.

#### Resume All Supervisors

```
POST /druid/indexer/v1/supervisor/resumeAll
```
Resume all supervisors at once.

#### Reset Supervisor
```
POST /druid/indexer/v1/supervisor/<supervisorId>/reset
Expand Down Expand Up @@ -241,7 +255,13 @@ with the supervisor history api, but will not be listed in the 'get supervisors'
or status report be retrieved. The only way this supervisor can start again is by submitting a functioning supervisor
spec to the create api.

#### Shutdown Supervisor
#### Terminate All Supervisors
```
POST /druid/indexer/v1/supervisor/terminateAll
```
Terminate all supervisors at once.

#### Shutdown Supervisor
_Deprecated: use the equivalent 'terminate' instead_
```
POST /druid/indexer/v1/supervisor/<supervisorId>/shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,31 @@ public boolean stopAndRemoveSupervisor(String id)
public boolean suspendOrResumeSupervisor(String id, boolean suspend)
{
Preconditions.checkState(started, "SupervisorManager not started");
Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
Preconditions.checkNotNull(pair.rhs, "spec");
Preconditions.checkNotNull(id, "id");

synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
return possiblySuspendOrResumeSupervisorInternal(id, suspend);
}
}

public void stopAndRemoveAllSupervisors()
{
Preconditions.checkState(started, "SupervisorManager not started");

synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec();
possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
return createAndStartSupervisorInternal(nextState, true);
supervisors.keySet().forEach(id -> possiblyStopAndRemoveSupervisorInternal(id, true));
}
}

public void suspendOrResumeAllSupervisors(boolean suspend)
{
Preconditions.checkState(started, "SupervisorManager not started");

synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
supervisors.keySet().forEach(id -> possiblySuspendOrResumeSupervisorInternal(id, suspend));
}
}

Expand Down Expand Up @@ -206,7 +224,7 @@ public boolean checkPointDataSourceMetadata(
* Stops a supervisor with a given id and then removes it from the list.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
* starting, stopping, suspending and resuming supervisors.
*
* @return true if a supervisor was stopped, false if there was no supervisor with this id
*/
Expand All @@ -226,11 +244,32 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write
return true;
}

/**
* Suspend or resume a supervisor with a given id.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting, stopping, suspending and resuming supervisors.
*
* @return true if a supervisor was suspended or resumed, false if there was no supervisor with this id
* or suspend a suspended supervisor or resume a running supervisor
*/
private boolean possiblySuspendOrResumeSupervisorInternal(String id, boolean suspend)
{
Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
if (pair == null || pair.rhs.isSuspended() == suspend) {
return false;
}

SupervisorSpec nextState = suspend ? pair.rhs.createSuspendedSpec() : pair.rhs.createRunningSpec();
possiblyStopAndRemoveSupervisorInternal(nextState.getId(), false);
return createAndStartSupervisorInternal(nextState, true);
}

/**
* Creates a supervisor from the provided spec and starts it if there is not already a supervisor with that id.
* <p/>
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting and stopping supervisors.
* starting, stopping, suspending and resuming supervisors.
*
* @return true if a new supervisor was created, false if there was already an existing supervisor with this id
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,35 @@ public Response terminate(@PathParam("id") final String id)
);
}

@POST
@Path("/suspendAll")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel like the All suffix is necessary for these methods, it seems implied since the url path doesn't specify a supervisor id, and doesn't appear to be a common convention in other API paths that do things to multiple things. That said, the base path for the supervisor api probably should have been /druid/indexer/v1/supervisors instead of /druid/indexer/v1/supervisor so it would have read a bit better, but that seems more painful than useful to change

@Produces(MediaType.APPLICATION_JSON)
public Response suspendAll()
{
return suspendOrResumeAll(true);
}

@POST
@Path("/resumeAll")
@Produces(MediaType.APPLICATION_JSON)
public Response resumeAll()
{
return suspendOrResumeAll(false);
}

@POST
@Path("/terminateAll")
@Produces(MediaType.APPLICATION_JSON)
public Response terminateAll()
{
return asLeaderWithSupervisorManager(
manager -> {
manager.stopAndRemoveAllSupervisors();
return Response.ok(ImmutableMap.of("status", "success")).build();
}
);
}

@GET
@Path("/history")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -378,23 +407,34 @@ private Response specSuspendOrResume(final String id, boolean suspend)
{
return asLeaderWithSupervisorManager(
manager -> {
Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
if (!spec.isPresent()) {
return Response.status(Response.Status.NOT_FOUND)
.entity(ImmutableMap.of("error", StringUtils.format("[%s] does not exist", id)))
.build();
}

if (spec.get().isSuspended() == suspend) {
final String errMsg =
StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running");
return Response.status(Response.Status.BAD_REQUEST)
if (manager.suspendOrResumeSupervisor(id, suspend)) {
Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
return Response.ok(spec.get()).build();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calls Optional.get() without a check

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to do so, because manager.suspendOrResumeSupervisor(id, suspend) return true which means the supervisorSpec is not absent.

} else {
Optional<SupervisorSpec> spec = manager.getSupervisorSpec(id);
Response.Status status;
String errMsg;
if (spec.isPresent()) {
status = Response.Status.BAD_REQUEST;
errMsg = StringUtils.format("[%s] is already %s", id, suspend ? "suspended" : "running");
} else {
status = Response.Status.NOT_FOUND;
errMsg = StringUtils.format("[%s] does not exist", id);
}
return Response.status(status)
.entity(ImmutableMap.of("error", errMsg))
.build();
}
manager.suspendOrResumeSupervisor(id, suspend);
spec = manager.getSupervisorSpec(id);
return Response.ok(spec.get()).build();
}
);
}

private Response suspendOrResumeAll(boolean suspend)
{
return asLeaderWithSupervisorManager(
manager -> {
manager.suspendOrResumeAllSupervisors(suspend);
return Response.ok(ImmutableMap.of("status", "success")).build();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it could be useful to return the list of spec ids which successfully had a state change for this method and terminate all, but I don't feel strongly about it

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have stated the reason why I did not return these ids above. For your convenience, I paste here:

I think there is no need to return these ids since these all endpoints are idempotent operations. Just return OK would be fine. And I think users do not care about which supervisor has a state change, they just need to know they have changed all supervisors to suspened state or running state. Besides, return these ids would make the code a little complex and ugly ):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,6 @@ public void testSpecGetStatus()
@Test
public void testSpecSuspend()
{

TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) {
@Override
public List<String> getDataSources()
{
return Collections.singletonList("datasource1");
}
};
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be removed, previously this test was first ensuring that a running spec would correctly be suspended, and then that an already suspended spec would result an error, now this only tests the latter behavior.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I removed this is that I have moved the condition spec.get().isSuspended() == suspend into SupervisorManager#possiblySuspendOrResumeSupervisorInternal and modified the implementation of SupervisorResource#specSuspendOrResume to keep consistent with the implementation of SupervisorResource#terminate. After done these, I have to modify the test code, then I found there was no need to keep this. You can check the whole SupervisorResourceTest#testSpecSuspend then you'll find I have tested both behaviors.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see now 👍

TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) {
@Override
public List<String> getDataSources()
Expand All @@ -309,11 +301,8 @@ public List<String> getDataSources()
};

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
.andReturn(Optional.of(running)).times(1)
.andReturn(Optional.of(suspended)).times(1);
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(true);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
replayAll();

Response response = supervisorResource.specSuspend("my-id");
Expand All @@ -326,7 +315,8 @@ public List<String> getDataSources()
resetAll();

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended)).atLeastOnce();
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", true)).andReturn(false);
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(suspended));
replayAll();

response = supervisorResource.specSuspend("my-id");
Expand All @@ -336,18 +326,9 @@ public List<String> getDataSources()
Assert.assertEquals(ImmutableMap.of("error", "[my-id] is already suspended"), response.getEntity());
}



@Test
public void testSpecResume()
{
TestSupervisorSpec suspended = new TestSupervisorSpec("my-id", null, null, true) {
@Override
public List<String> getDataSources()
{
return Collections.singletonList("datasource1");
}
};
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

TestSupervisorSpec running = new TestSupervisorSpec("my-id", null, null, false) {
@Override
public List<String> getDataSources()
Expand All @@ -357,11 +338,8 @@ public List<String> getDataSources()
};

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id"))
.andReturn(Optional.of(suspended)).times(1)
.andReturn(Optional.of(running)).times(1);
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(true);
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
replayAll();

Response response = supervisorResource.specResume("my-id");
Expand All @@ -374,7 +352,8 @@ public List<String> getDataSources()
resetAll();

EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running)).atLeastOnce();
EasyMock.expect(supervisorManager.suspendOrResumeSupervisor("my-id", false)).andReturn(false);
EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")).andReturn(Optional.of(running));
replayAll();

response = supervisorResource.specResume("my-id");
Expand All @@ -385,19 +364,19 @@ public List<String> getDataSources()
}

@Test
public void testShutdown()
public void testTerminate()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2);
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id")).andReturn(true);
EasyMock.expect(supervisorManager.stopAndRemoveSupervisor("my-id-2")).andReturn(false);
replayAll();

Response response = supervisorResource.shutdown("my-id");
Response response = supervisorResource.terminate("my-id");

Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity());

response = supervisorResource.shutdown("my-id-2");
response = supervisorResource.terminate("my-id-2");

Assert.assertEquals(404, response.getStatus());
verifyAll();
Expand All @@ -407,12 +386,54 @@ public void testShutdown()
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();

response = supervisorResource.shutdown("my-id");
response = supervisorResource.terminate("my-id");
verifyAll();

Assert.assertEquals(503, response.getStatus());
}

@Test
public void testSuspendAll()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
supervisorManager.suspendOrResumeAllSupervisors(true);
EasyMock.expectLastCall();
replayAll();

Response response = supervisorResource.suspendAll();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity());
verifyAll();
}

@Test
public void testResumeAll()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
supervisorManager.suspendOrResumeAllSupervisors(false);
EasyMock.expectLastCall();
replayAll();

Response response = supervisorResource.resumeAll();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity());
verifyAll();
}

@Test
public void testTerminateAll()
{
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager));
supervisorManager.stopAndRemoveAllSupervisors();
EasyMock.expectLastCall();
replayAll();

Response response = supervisorResource.terminateAll();
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("status", "success"), response.getEntity());
verifyAll();
}

@Test
public void testSpecGetAllHistory()
{
Expand Down Expand Up @@ -872,7 +893,7 @@ public void testReset()
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent());
replayAll();

response = supervisorResource.shutdown("my-id");
response = supervisorResource.terminate("my-id");

Assert.assertEquals(503, response.getStatus());
verifyAll();
Expand Down