From 9a655727c5d3b9a0186c9a8badab08344f64ade8 Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Fri, 17 Aug 2018 01:43:23 +0800 Subject: [PATCH 1/3] 'shutdownAllTasks' API for a dataSource Change-Id: I30d14390457d39e0427d23a48f4f224223dc5777 --- docs/content/operations/api-reference.md | 3 ++ .../overlord/http/OverlordResource.java | 36 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md index d933c4f40528..b0ff77c40bed 100644 --- a/docs/content/operations/api-reference.md +++ b/docs/content/operations/api-reference.md @@ -378,6 +378,9 @@ Endpoint for submitting tasks and supervisor specs to the overlord. Returns the Shuts down a task. +* `druid/indexer/v1/task/{dataSource}/shutdown` + +Shuts down all tasks for a dataSource. ## MiddleManager diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 6eb428c46490..1aa2c77ddf39 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -337,6 +337,42 @@ public Response apply(TaskQueue taskQueue) ); } + @POST + @Path("/task/{dataSource}/shutdown") + @Produces(MediaType.APPLICATION_JSON) + public Response shutdownTasksForDataSource(@PathParam("dataSource") final String dataSource) + { + return asLeaderWith( + taskMaster.getTaskQueue(), + new Function() + { + @Override + public Response apply(TaskQueue taskQueue) + { + final List tasks = taskStorageQueryAdapter.getActiveTasks(); + boolean ownTask = false; + for (final Task task : tasks) { + if (task.getDataSource().equals(dataSource)) { + taskQueue.shutdown(task.getId()); + ownTask = true; + } + } + + if (!ownTask) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(StringUtils.format( + "Cannot find any active task for this dataSource: [%s]", + dataSource + )) + .build(); + } else { + return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); + } + } + } + ); + } + @POST @Path("/taskStatus") @Produces(MediaType.APPLICATION_JSON) From 6fbbe400eb19d2c41ba0636db230d7e9c3fd0ee2 Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Fri, 17 Aug 2018 02:34:13 +0800 Subject: [PATCH 2/3] fix api path and return Change-Id: Ib463f31ee2c4cb168cf2697f149be845b57c42e5 --- docs/content/operations/api-reference.md | 2 +- .../indexing/overlord/http/OverlordResource.java | 16 ++-------------- 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md index b0ff77c40bed..a38be3b0696b 100644 --- a/docs/content/operations/api-reference.md +++ b/docs/content/operations/api-reference.md @@ -378,7 +378,7 @@ Endpoint for submitting tasks and supervisor specs to the overlord. Returns the Shuts down a task. -* `druid/indexer/v1/task/{dataSource}/shutdown` +* `druid/indexer/v1/task/{dataSource}/shutdownAllTasks` Shuts down all tasks for a dataSource. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 1aa2c77ddf39..76700e3c1c3b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -338,7 +338,7 @@ public Response apply(TaskQueue taskQueue) } @POST - @Path("/task/{dataSource}/shutdown") + @Path("/task/{dataSource}/shutdownAllTasks") @Produces(MediaType.APPLICATION_JSON) public Response shutdownTasksForDataSource(@PathParam("dataSource") final String dataSource) { @@ -350,24 +350,12 @@ public Response shutdownTasksForDataSource(@PathParam("dataSource") final String public Response apply(TaskQueue taskQueue) { final List tasks = taskStorageQueryAdapter.getActiveTasks(); - boolean ownTask = false; for (final Task task : tasks) { if (task.getDataSource().equals(dataSource)) { taskQueue.shutdown(task.getId()); - ownTask = true; } } - - if (!ownTask) { - return Response.status(Response.Status.BAD_REQUEST) - .entity(StringUtils.format( - "Cannot find any active task for this dataSource: [%s]", - dataSource - )) - .build(); - } else { - return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); - } + return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); } } ); From 8f2f831be2124de6d37b40eccb7dc2a731526e45 Mon Sep 17 00:00:00 2001 From: "qiumingming.2018" Date: Fri, 17 Aug 2018 10:58:22 +0800 Subject: [PATCH 3/3] optimize implementation Change-Id: I50a8dcd44dd9d36c9ecbfa78e103eb9bff32eab9 --- .../io/druid/indexing/overlord/http/OverlordResource.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 76700e3c1c3b..3425a87b35a1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -349,11 +349,9 @@ public Response shutdownTasksForDataSource(@PathParam("dataSource") final String @Override public Response apply(TaskQueue taskQueue) { - final List tasks = taskStorageQueryAdapter.getActiveTasks(); - for (final Task task : tasks) { - if (task.getDataSource().equals(dataSource)) { - taskQueue.shutdown(task.getId()); - } + final List> tasks = taskStorageQueryAdapter.getActiveTaskInfo(dataSource); + for (final TaskInfo task : tasks) { + taskQueue.shutdown(task.getId()); } return Response.ok(ImmutableMap.of("dataSource", dataSource)).build(); }