Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/operations/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ Endpoint for submitting tasks and supervisor specs to the overlord. Returns the

Shuts down a task.

* `druid/indexer/v1/task/{dataSource}/shutdownAllTasks`
* `druid/indexer/v1/datasources/{dataSource}/shutdownAllTasks`

Shuts down all tasks for a dataSource.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.server.http.security.ConfigResourceFilter;
import org.apache.druid.server.http.security.DatasourceResourceFilter;
import org.apache.druid.server.http.security.StateResourceFilter;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
Expand Down Expand Up @@ -339,8 +340,9 @@ public Response apply(TaskQueue taskQueue)
}

@POST
@Path("/task/{dataSource}/shutdownAllTasks")
@Path("/datasources/{dataSource}/shutdownAllTasks")
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(DatasourceResourceFilter.class)
public Response shutdownTasksForDataSource(@PathParam("dataSource") final String dataSource)
{
return asLeaderWith(
Expand Down Expand Up @@ -1002,26 +1004,6 @@ public Response doGetReports(
}
}

@GET
@Path("/dataSources/{dataSource}")
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningTasksByDataSource(@PathParam("dataSource") String dataSource,
@Context HttpServletRequest request)
{
Optional<TaskRunner> ts = taskMaster.getTaskRunner();
if (!ts.isPresent()) {
return Response.status(Response.Status.NOT_FOUND).entity("No tasks are running").build();
}
Collection<? extends TaskRunnerWorkItem> runningTasks = ts.get().getRunningTasks();
if (runningTasks == null || runningTasks.isEmpty()) {
return Response.status(Response.Status.NOT_FOUND)
.entity("No running tasks found for the datasource : " + dataSource).build();
}
List<TaskRunnerWorkItem> taskRunnerWorkItemList = runningTasks.stream()
.filter(task -> dataSource.equals(task.getDataSource())).collect(Collectors.toList());
return Response.ok(taskRunnerWorkItemList).build();
}

private <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
{
if (x.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var killTask = function(taskId) {
if(confirm('Do you really want to kill: '+taskId)) {
$.ajax({
type:'POST',
url: '/druid/indexer/v1/task/'+ taskId +'/terminate',
url: '/druid/indexer/v1/task/'+ taskId +'/shutdown',
data: ''
}).done(function(data) {
setTimeout(function() { location.reload(true) }, 750);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ public Access authorize(AuthenticationResult authenticationResult, Resource reso
);
}

public void expectAuthorizationTokenCheck()
private void expectAuthorizationTokenCheck()
{
AuthenticationResult authenticationResult = new AuthenticationResult("druid", "druid", null, null);
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).anyTimes();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce();
EasyMock.expect(req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT))
.andReturn(authenticationResult)
.anyTimes();
Expand Down Expand Up @@ -833,7 +833,10 @@ public void testKillPendingSegments()
@Test
public void testGetTaskPayload() throws Exception
{
expectAuthorizationTokenCheck();
// This is disabled since OverlordResource.getTaskStatus() is annotated with TaskResourceFilter which is supposed to
// set authorization token properly, but isn't called in this test.
// This should be fixed in https://github.com/apache/incubator-druid/issues/6685.
// expectAuthorizationTokenCheck();
final NoopTask task = NoopTask.create("mydatasource");
EasyMock.expect(taskStorageQueryAdapter.getTask("mytask"))
.andReturn(Optional.of(task));
Expand Down Expand Up @@ -861,7 +864,10 @@ public void testGetTaskPayload() throws Exception
@Test
public void testGetTaskStatus() throws Exception
{
expectAuthorizationTokenCheck();
// This is disabled since OverlordResource.getTaskStatus() is annotated with TaskResourceFilter which is supposed to
// set authorization token properly, but isn't called in this test.
// This should be fixed in https://github.com/apache/incubator-druid/issues/6685.
// expectAuthorizationTokenCheck();
final Task task = NoopTask.create("mytask", 0);
final TaskStatus status = TaskStatus.running("mytask");

Expand Down Expand Up @@ -910,54 +916,6 @@ public void testGetTaskStatus() throws Exception
Assert.assertEquals(new TaskStatusResponse("othertask", null), taskStatusResponse2);
}

@Test
public void testGetRunningTasksByDataSource()
{

List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null)));
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(0))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny"))).once();
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(1))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow"))).once();

EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
List<TaskRunnerWorkItem> responseObjects = (List) overlordResource.getRunningTasksByDataSource("ds_test", req)
.getEntity();

Assert.assertEquals(2, responseObjects.size());
Assert.assertEquals(taskStorageQueryAdapter.getTask("id_1").get().getId(), responseObjects.get(0).getTaskId());
Assert.assertEquals(taskStorageQueryAdapter.getTask("id_2").get().getId(), responseObjects.get(1).getTaskId());
Assert.assertTrue("DataSource Check", "ds_test".equals(responseObjects.get(0).getDataSource()));
}

@Test
public void testGetRunningTasksByDataSourceNeg()
{
expectAuthorizationTokenCheck();

List<String> tasksIds = ImmutableList.of("id_1", "id_2");
EasyMock.<Collection<? extends TaskRunnerWorkItem>>expect(taskRunner.getRunningTasks()).andReturn(
ImmutableList.of(
new MockTaskRunnerWorkItem(tasksIds.get(0), null),
new MockTaskRunnerWorkItem(tasksIds.get(1), null)));
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(0))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(0), "deny"))).once();
EasyMock.expect(taskStorageQueryAdapter.getTask(tasksIds.get(1))).andReturn(
Optional.of(getTaskWithIdAndDatasource(tasksIds.get(1), "allow"))).once();

EasyMock.replay(taskRunner, taskMaster, taskStorageQueryAdapter, indexerMetadataStorageAdapter, req);
Assert.assertTrue(taskStorageQueryAdapter.getTask("id_1").isPresent());
Assert.assertTrue(taskStorageQueryAdapter.getTask("id_2").isPresent());
List<TaskRunnerWorkItem> responseObjects = (List) overlordResource.getRunningTasksByDataSource("ds_NA", req)
.getEntity();

Assert.assertEquals(0, responseObjects.size());
}

@After
public void tearDown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public boolean isApplicable(String requestPath)
List<String> applicablePaths = ImmutableList.of(
"druid/coordinator/v1/datasources/",
"druid/coordinator/v1/metadata/datasources/",
"druid/v2/datasources/"
"druid/v2/datasources/",
"druid/indexer/v1/datasources"
);
for (String path : applicablePaths) {
if (requestPath.startsWith(path) && !requestPath.equals(path)) {
Expand Down