From 2a9970c24d2364d6943c8ae4a81ceb7c7219287c Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 8 Feb 2019 15:15:05 -0800 Subject: [PATCH 1/8] document middle manager api --- docs/content/operations/api-reference.md | 39 +++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md index 7a0b43421fc9..44f90d9ab44d 100644 --- a/docs/content/operations/api-reference.md +++ b/docs/content/operations/api-reference.md @@ -498,7 +498,44 @@ This section documents the API endpoints for the processes that reside on Data s ### MiddleManager -The MiddleManager does not have any API endpoints beyond the [common endpoints](#common). +* `/druid/worker/v1/disable` + +'Disable' a MiddleManager, causing it to stop accepting new tasks but complete all existing tasks. Returns JSON object keyed by the `druid.host`: +```json +{"localhost":"disabled"} +``` + +* `/druid/worker/v1/enable` + +'Enable' a MiddleManager, allowing it to accept new tasks again if it was previously disabled. Returns JSON object keyed by the `druid.host`: + +```json +{"localhost":"enabled"} +``` + +* `/druid/worker/v1/enabled` + +Check whether a MiddleManager is in an enabled or disabled state. Returns JSON object keyed by the `druid.host` and the boolean state. + +```json +{"localhost":true} +``` + +* `/druid/worker/v1/tasks` + +Retrieve a list of active tasks being run on MiddleManager. Returns JSON list of taskid strings. + +* `/druid/worker/v1/task/{taskid}/shutdown` + +Shutdown a running task by `taskid`. Returns JSON: + +```json +{"task":"index_kafka_wikiticker-v2_f7011f8ffba384b_fpeclode"} +``` + +* `/druid/worker/v1/task/{taskid}/log` + +Retrieve task log output stream by task id. ### Peon From 0247d907972c0305fffe3e614d2c0ac2e5e7951d Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 8 Feb 2019 15:20:00 -0800 Subject: [PATCH 2/8] re-arrange --- docs/content/operations/api-reference.md | 35 ++++++++++++++---------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md index 44f90d9ab44d..cc2510ee1a87 100644 --- a/docs/content/operations/api-reference.md +++ b/docs/content/operations/api-reference.md @@ -498,6 +498,26 @@ This section documents the API endpoints for the processes that reside on Data s ### MiddleManager +##### GET + +* `/druid/worker/v1/enabled` + +Check whether a MiddleManager is in an enabled or disabled state. Returns JSON object keyed by the `druid.host` and the boolean state. + +```json +{"localhost":true} +``` + +* `/druid/worker/v1/tasks` + +Retrieve a list of active tasks being run on MiddleManager. Returns JSON list of taskid strings. + +* `/druid/worker/v1/task/{taskid}/log` + +Retrieve task log output stream by task id. + +##### POST + * `/druid/worker/v1/disable` 'Disable' a MiddleManager, causing it to stop accepting new tasks but complete all existing tasks. Returns JSON object keyed by the `druid.host`: @@ -513,18 +533,6 @@ This section documents the API endpoints for the processes that reside on Data s {"localhost":"enabled"} ``` -* `/druid/worker/v1/enabled` - -Check whether a MiddleManager is in an enabled or disabled state. Returns JSON object keyed by the `druid.host` and the boolean state. - -```json -{"localhost":true} -``` - -* `/druid/worker/v1/tasks` - -Retrieve a list of active tasks being run on MiddleManager. Returns JSON list of taskid strings. - * `/druid/worker/v1/task/{taskid}/shutdown` Shutdown a running task by `taskid`. Returns JSON: @@ -533,9 +541,6 @@ Shutdown a running task by `taskid`. Returns JSON: {"task":"index_kafka_wikiticker-v2_f7011f8ffba384b_fpeclode"} ``` -* `/druid/worker/v1/task/{taskid}/log` - -Retrieve task log output stream by task id. ### Peon From 314507bcca82dcb1c22ea3e9626c642880a837ca Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Sun, 10 Feb 2019 18:26:59 -0800 Subject: [PATCH 3/8] correction --- docs/content/operations/api-reference.md | 25 ++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md index cc2510ee1a87..a9395aab6aa8 100644 --- a/docs/content/operations/api-reference.md +++ b/docs/content/operations/api-reference.md @@ -494,7 +494,8 @@ Please use the equivalent 'terminate' instead. ## Data Server -This section documents the API endpoints for the processes that reside on Data servers (MiddleManagers/Peons and Historicals) in the suggested [three-server configuration](../design/processes.html#server-types). +This section documents the API endpoints for the processes that reside on Data servers (MiddleManagers/Peons and Historicals) +in the suggested [three-server configuration](../design/processes.html#server-types). ### MiddleManager @@ -502,16 +503,21 @@ This section documents the API endpoints for the processes that reside on Data s * `/druid/worker/v1/enabled` -Check whether a MiddleManager is in an enabled or disabled state. Returns JSON object keyed by the `druid.host` and the boolean state. +Check whether a MiddleManager is in an enabled or disabled state. Returns JSON object keyed by the combined `druid.host` +and `druid.port` with the boolean state as the value. ```json -{"localhost":true} +{"localhost:8091":true} ``` * `/druid/worker/v1/tasks` Retrieve a list of active tasks being run on MiddleManager. Returns JSON list of taskid strings. +```json +["index_wikiticker_2019-02-11T02:20:15.316Z"] +``` + * `/druid/worker/v1/task/{taskid}/log` Retrieve task log output stream by task id. @@ -520,17 +526,20 @@ Retrieve task log output stream by task id. * `/druid/worker/v1/disable` -'Disable' a MiddleManager, causing it to stop accepting new tasks but complete all existing tasks. Returns JSON object keyed by the `druid.host`: +'Disable' a MiddleManager, causing it to stop accepting new tasks but complete all existing tasks. Returns JSON object +keyed by the combined `druid.host` and `druid.port`: + ```json -{"localhost":"disabled"} +{"localhost:8091":"disabled"} ``` * `/druid/worker/v1/enable` -'Enable' a MiddleManager, allowing it to accept new tasks again if it was previously disabled. Returns JSON object keyed by the `druid.host`: +'Enable' a MiddleManager, allowing it to accept new tasks again if it was previously disabled. Returns JSON object +keyed by the combined `druid.host` and `druid.port`: ```json -{"localhost":"enabled"} +{"localhost:8091":"enabled"} ``` * `/druid/worker/v1/task/{taskid}/shutdown` @@ -538,7 +547,7 @@ Retrieve task log output stream by task id. Shutdown a running task by `taskid`. Returns JSON: ```json -{"task":"index_kafka_wikiticker-v2_f7011f8ffba384b_fpeclode"} +{"task":"index_kafka_wikiticker_f7011f8ffba384b_fpeclode"} ``` From a28f28040aaf233c9bfdb34cc614631bca8f82fa Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 12 Feb 2019 18:37:07 -0800 Subject: [PATCH 4/8] document more missing overlord api calls, minor re-arrange of some code i was referencing --- .../apache/druid/indexer/TaskStatusPlus.java | 3 - docs/content/operations/api-reference.md | 134 ++++-- .../overlord/http/OverlordResource.java | 399 +++++++++--------- 3 files changed, 300 insertions(+), 236 deletions(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index 4912900fb88f..5571eb95d6c5 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -31,8 +31,6 @@ public class TaskStatusPlus { - private static final Logger log = new Logger(TaskStatusPlus.class); - private final String id; private final String type; private final DateTime createdTime; @@ -74,7 +72,6 @@ public TaskStatusPlus( ); } - @JsonCreator public TaskStatusPlus( @JsonProperty("id") String id, diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md index a9395aab6aa8..2d9db69cc7c7 100644 --- a/docs/content/operations/api-reference.md +++ b/docs/content/operations/api-reference.md @@ -145,14 +145,17 @@ Returns full segment metadata for a specific segment as stored in the metadata s * `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments` -Returns a list of all segments, overlapping with any of given intervals, for a datasource as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"] +Returns a list of all segments, overlapping with any of given intervals, for a datasource as stored in the metadata store. Request body is array of string IS0 8601 intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"] * `/druid/coordinator/v1/metadata/datasources/{dataSourceName}/segments?full` -Returns a list of all segments, overlapping with any of given intervals, for a datasource with the full segment metadata as stored in the metadata store. Request body is array of string intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"] +Returns a list of all segments, overlapping with any of given intervals, for a datasource with the full segment metadata as stored in the metadata store. Request body is array of string ISO 8601 intervals like [interval1, interval2,...] for example ["2012-01-01T00:00:00.000/2012-01-03T00:00:00.000", "2012-01-05T00:00:00.000/2012-01-07T00:00:00.000"] #### Datasources +Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` +(e.g., 2016-06-27_2016-06-28). + ##### GET * `/druid/coordinator/v1/datasources` @@ -189,7 +192,7 @@ Returns a map of an interval to a map of segment metadata to a set of server nam * `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}` -Returns a set of segment ids for an ISO8601 interval. Note that {interval} parameters are delimited by a `_` instead of a `/` (e.g., 2016-06-27_2016-06-28). +Returns a set of segment ids for an interval. * `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}?simple` @@ -236,18 +239,19 @@ Enables a segment of a datasource. Disables a datasource. * `/druid/coordinator/v1/datasources/{dataSourceName}/intervals/{interval}` -* `@Deprecated. /druid/coordinator/v1/datasources/{dataSourceName}?kill=true&interval={myISO8601Interval}` +* `@Deprecated. /druid/coordinator/v1/datasources/{dataSourceName}?kill=true&interval={myInterval}` Runs a [Kill task](../ingestion/tasks.html) for a given interval and datasource. -Note that {interval} parameters are delimited by a `_` instead of a `/` (e.g., 2016-06-27_2016-06-28). - * `/druid/coordinator/v1/datasources/{dataSourceName}/segments/{segmentId}` Disables a segment. #### Retention Rules +Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` +(e.g., 2016-06-27_2016-06-28). + ##### GET * `/druid/coordinator/v1/rules` @@ -294,9 +298,10 @@ Optional Header Parameters for auditing the config change can also be specified. #### Intervals -##### GET +Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` +(e.g., 2016-06-27_2016-06-28). -Note that {interval} parameters are delimited by a `_` instead of a `/` (e.g., 2016-06-27_2016-06-28). +##### GET * `/druid/coordinator/v1/intervals` @@ -340,6 +345,7 @@ will be set for them. Creates or updates the compaction config for a dataSource. See [Compaction Configuration](../configuration/index.html#compaction-dynamic-configuration) for configuration details. + ##### DELETE * `/druid/coordinator/v1/config/compaction/{dataSource}` @@ -359,12 +365,12 @@ ports. * `/druid/coordinator/v1/servers?simple` Returns a list of server data objects in which each object has the following keys: -- `host`: host URL include (`{hostname}:{port}`) -- `type`: node type (`indexer-executor`, `historical`) -- `currSize`: storage size currently used -- `maxSize`: maximum storage size -- `priority` -- `tier` +* `host`: host URL include (`{hostname}:{port}`) +* `type`: node type (`indexer-executor`, `historical`) +* `currSize`: storage size currently used +* `maxSize`: maximum storage size +* `priority` +* `tier` ### Overlord @@ -384,8 +390,44 @@ only want the active leader to be considered in-service at the load balancer. #### Tasks +Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` +(e.g., 2016-06-27_2016-06-28). + ##### GET +* `/druid/indexer/v1/tasks` + +Retrieve list of tasks. Accepts query string parameters `state`, `datasource`, `createdTimeInterval`, `max`, and `type`. + +|Query Parameter |Description | +|---|---| +|`state`|filter list of tasks by task state, valid options are `running`, `complete`, `waiting`, and `pending`.| +| `datasource`| return tasks filtered by Druid datasource.| +| `createdTimeInterval`| return tasks created within the specified interval. | +| `max`| maximum number of `"complete"` tasks to return. Only applies when `state` is set to `"complete"`.| +| `type`| filter tasks by task type.| + + +* `/druid/indexer/v1/completeTasks` + +Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=complete`. + +* `/druid/indexer/v1/runningTasks` + +Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=running`. + +* `/druid/indexer/v1/waitingTasks` + +Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=waiting`. + +* `/druid/indexer/v1/pendingTasks` + +Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=pending`. + +* `/druid/indexer/v1/task/{taskId}` + +Retrieve the 'payload' of a task. + * `/druid/indexer/v1/task/{taskId}/status` Retrieve the status of a task. @@ -408,14 +450,25 @@ Retrieve a [task completion report](../ingestion/reports.html) for a task. Only Endpoint for submitting tasks and supervisor specs to the Overlord. Returns the taskId of the submitted task. -* `druid/indexer/v1/task/{taskId}/shutdown` +* `/druid/indexer/v1/task/{taskId}/shutdown` Shuts down a task. -* `druid/indexer/v1/datasources/{dataSource}/shutdownAllTasks` +* `/druid/indexer/v1/datasources/{dataSource}/shutdownAllTasks` Shuts down all tasks for a dataSource. +* `/druid/indexer/v1/taskStatus` + +Retrieve list of task status objects for list of task id strings in request body. + +##### DELETE + +* `/druid/indexer/v1/pendingSegments/{dataSource}` + +Cleanup pending segments table in metadata storage for `datasource`. Returns a JSON object response with `numDeleted` +and count of rows deleted from the pending segments table. + #### Supervisors ##### GET @@ -492,6 +545,33 @@ This API is deprecated and will be removed in future releases. Please use the equivalent 'terminate' instead. +#### Dynamic Configuration +See [Overlord Dynamic Configuration](../configuration/index.html#overlord-dynamic-configuration) for details. + +Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` +(e.g., 2016-06-27_2016-06-28). + +##### GET + +* `/druid/indexer/v1/worker` + +Retreives current overlord dynamic configuration. + +* `/druid/indexer/v1/worker/history?interval={interval}&counter={count}` + +Retrieves history of changes to overlord dynamic configuration. Accepts `interval` and `count` query string parameters +to filter by interval and limit the number of results respectively. + +* `/druid/indexer/v1/scaling` + +Retrieves overlord scaling events if auto-scaling runners are in use. + +##### POST + +* /druid/indexer/v1/worker + +Update overlord dynamic worker configuration. + ## Data Server This section documents the API endpoints for the processes that reside on Data servers (MiddleManagers/Peons and Historicals) @@ -512,15 +592,17 @@ and `druid.port` with the boolean state as the value. * `/druid/worker/v1/tasks` -Retrieve a list of active tasks being run on MiddleManager. Returns JSON list of taskid strings. +Retrieve a list of active tasks being run on MiddleManager. Returns JSON list of taskid strings. Normal usage should +prefer to use the `/druid/indexer/v1/tasks` [Overlord API](#overlord) or one of it's task state specific variants instead. ```json ["index_wikiticker_2019-02-11T02:20:15.316Z"] ``` -* `/druid/worker/v1/task/{taskid}/log` +* `/druid/worker/v1/task/{taskid}/log` -Retrieve task log output stream by task id. +Retrieve task log output stream by task id. Normal usage should prefer to use the `/druid/indexer/v1/task/{taskId}/log` +[Overlord API](#overlord) instead. ##### POST @@ -544,7 +626,8 @@ keyed by the combined `druid.host` and `druid.port`: * `/druid/worker/v1/task/{taskid}/shutdown` -Shutdown a running task by `taskid`. Returns JSON: +Shutdown a running task by `taskid`. Normal usage should prefer to use the `/druid/indexer/v1/task/{taskId}/shutdown` +[Overlord API](#overlord) instead. Returns JSON: ```json {"task":"index_kafka_wikiticker_f7011f8ffba384b_fpeclode"} @@ -589,6 +672,9 @@ This section documents the API endpoints for the processes that reside on Query #### Datasource Information +Note that all _interval_ URL parameters are ISO 8601 strings delimited by a `_` instead of a `/` +(e.g., 2016-06-27_2016-06-28). + ##### GET * `/druid/v2/datasources` @@ -599,7 +685,7 @@ Returns a list of queryable datasources. Returns the dimensions and metrics of the datasource. Optionally, you can provide request parameter "full" to get list of served intervals with dimensions and metrics being served for those intervals. You can also provide request param "interval" explicitly to refer to a particular interval. -If no interval is specified, a default interval spanning a configurable period before the current time will be used. The duration of this interval is specified in ISO8601 format via: +If no interval is specified, a default interval spanning a configurable period before the current time will be used. The default duration of this interval is specified in ISO 8601 duration format via: druid.query.segmentMetadata.defaultHistory @@ -608,7 +694,7 @@ druid.query.segmentMetadata.defaultHistory Returns the dimensions of the datasource.
-This API is deprecated and will be removed in future releases. Please use [SegmentMetadataQuery](../querying/segmentmetadataquery.html) instead +This API is deprecated and will be removed in future releases. Please use SegmentMetadataQuery instead which provides more comprehensive information and supports all dataSource types including streaming dataSources. It's also encouraged to use [INFORMATION_SCHEMA tables](../querying/sql.html#retrieving-metadata) if you're using SQL.
@@ -618,12 +704,12 @@ if you're using SQL. Returns the metrics of the datasource.
-This API is deprecated and will be removed in future releases. Please use [SegmentMetadataQuery](../querying/segmentmetadataquery.html) instead +This API is deprecated and will be removed in future releases. Please use SegmentMetadataQuery instead which provides more comprehensive information and supports all dataSource types including streaming dataSources. It's also encouraged to use [INFORMATION_SCHEMA tables](../querying/sql.html#retrieving-metadata) if you're using SQL.
-* `/druid/v2/datasources/{dataSourceName}/candidates?intervals={comma-separated-intervals-in-ISO8601-format}&numCandidates={numCandidates}` +* `/druid/v2/datasources/{dataSourceName}/candidates?intervals={comma-separated-intervals}&numCandidates={numCandidates}` Returns segment information lists including server locations for the given datasource and intervals. If "numCandidates" is not specified, it will return all servers for each interval. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 9b59202ee0ed..8e901db0a27d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -122,7 +122,6 @@ public class OverlordResource private AtomicReference workerConfigRef = null; private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete"); - @Inject public OverlordResource( TaskMaster taskMaster, @@ -503,100 +502,6 @@ public Response getWaitingTasks(@Context final HttpServletRequest req) return getTasks("waiting", null, null, null, null, req); } - private static class AnyTask extends TaskRunnerWorkItem - { - private final String taskType; - private final String dataSource; - private final TaskState taskState; - private final RunnerTaskState runnerTaskState; - private final DateTime createdTime; - private final DateTime queueInsertionTime; - private final TaskLocation taskLocation; - - AnyTask( - String taskId, - String taskType, - ListenableFuture result, - String dataSource, - TaskState state, - RunnerTaskState runnerState, - DateTime createdTime, - DateTime queueInsertionTime, - TaskLocation taskLocation - ) - { - super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH); - this.taskType = taskType; - this.dataSource = dataSource; - this.taskState = state; - this.runnerTaskState = runnerState; - this.createdTime = createdTime; - this.queueInsertionTime = queueInsertionTime; - this.taskLocation = taskLocation; - } - - @Override - public TaskLocation getLocation() - { - return taskLocation; - } - - @Override - public String getTaskType() - { - return taskType; - } - - @Override - public String getDataSource() - { - return dataSource; - } - - public TaskState getTaskState() - { - return taskState; - } - - public RunnerTaskState getRunnerTaskState() - { - return runnerTaskState; - } - - @Override - public DateTime getCreatedTime() - { - return createdTime; - } - - @Override - public DateTime getQueueInsertionTime() - { - return queueInsertionTime; - } - - public AnyTask withTaskState( - TaskState newTaskState, - RunnerTaskState runnerState, - DateTime createdTime, - DateTime queueInsertionTime, - TaskLocation taskLocation - ) - { - return new AnyTask( - getTaskId(), - getTaskType(), - getResult(), - getDataSource(), - newTaskState, - runnerState, - createdTime, - queueInsertionTime, - taskLocation - ); - } - } - @GET @Path("/pendingTasks") @Produces(MediaType.APPLICATION_JSON) @@ -760,120 +665,6 @@ public Response getTasks( return Response.ok(authorizedList).build(); } - private static BiFunction, RunnerTaskState, TaskStatusPlus> newTaskInfo2TaskStatusPlusFn() - { - return (taskInfo, runnerTaskState) -> new TaskStatusPlus( - taskInfo.getId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), - taskInfo.getCreatedTime(), - // Would be nice to include the real queue insertion time, but the - // TaskStorage API doesn't yet allow it. - DateTimes.EPOCH, - taskInfo.getStatus().getStatusCode(), - runnerTaskState, - taskInfo.getStatus().getDuration(), - TaskLocation.unknown(), - taskInfo.getDataSource(), - taskInfo.getStatus().getErrorMsg() - ); - } - - private List filterActiveTasks( - RunnerTaskState state, - List allTasks - ) - { - //divide active tasks into 3 lists : running, pending, waiting - Optional taskRunnerOpt = taskMaster.getTaskRunner(); - if (!taskRunnerOpt.isPresent()) { - throw new WebApplicationException( - Response.serverError().entity("No task runner found").build() - ); - } - TaskRunner runner = taskRunnerOpt.get(); - // the order of tasks below is waiting, pending, running to prevent - // skipping a task, it's the order in which tasks will change state - // if they do while this is code is executing, so a task might be - // counted twice but never skipped - if (RunnerTaskState.WAITING.equals(state)) { - Collection runnersKnownTasks = runner.getKnownTasks(); - Set runnerKnownTaskIds = runnersKnownTasks - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toSet()); - final List waitingTasks = new ArrayList<>(); - for (TaskRunnerWorkItem task : allTasks) { - if (!runnerKnownTaskIds.contains(task.getTaskId())) { - waitingTasks.add(((AnyTask) task).withTaskState( - TaskState.RUNNING, - RunnerTaskState.WAITING, - task.getCreatedTime(), - task.getQueueInsertionTime(), - task.getLocation() - )); - } - } - return waitingTasks; - } - - if (RunnerTaskState.PENDING.equals(state)) { - Collection knownPendingTasks = runner.getPendingTasks(); - Set pendingTaskIds = knownPendingTasks - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toSet()); - Map workItemIdMap = knownPendingTasks - .stream() - .collect(Collectors.toMap( - TaskRunnerWorkItem::getTaskId, - java.util.function.Function.identity(), - (previousWorkItem, newWorkItem) -> newWorkItem - )); - final List pendingTasks = new ArrayList<>(); - for (TaskRunnerWorkItem task : allTasks) { - if (pendingTaskIds.contains(task.getTaskId())) { - pendingTasks.add(((AnyTask) task).withTaskState( - TaskState.RUNNING, - RunnerTaskState.PENDING, - workItemIdMap.get(task.getTaskId()).getCreatedTime(), - workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(), - workItemIdMap.get(task.getTaskId()).getLocation() - )); - } - } - return pendingTasks; - } - - if (RunnerTaskState.RUNNING.equals(state)) { - Collection knownRunningTasks = runner.getRunningTasks(); - Set runningTaskIds = knownRunningTasks - .stream() - .map(TaskRunnerWorkItem::getTaskId) - .collect(Collectors.toSet()); - Map workItemIdMap = knownRunningTasks - .stream() - .collect(Collectors.toMap( - TaskRunnerWorkItem::getTaskId, - java.util.function.Function.identity(), - (previousWorkItem, newWorkItem) -> newWorkItem - )); - final List runningTasks = new ArrayList<>(); - for (TaskRunnerWorkItem task : allTasks) { - if (runningTaskIds.contains(task.getTaskId())) { - runningTasks.add(((AnyTask) task).withTaskState( - TaskState.RUNNING, - RunnerTaskState.RUNNING, - workItemIdMap.get(task.getTaskId()).getCreatedTime(), - workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(), - workItemIdMap.get(task.getTaskId()).getLocation() - )); - } - } - return runningTasks; - } - return allTasks; - } - @DELETE @Path("/pendingSegments/{dataSource}") @Produces(MediaType.APPLICATION_JSON) @@ -1016,6 +807,102 @@ private Response asLeaderWith(Optional x, Function f) } } + private List filterActiveTasks( + RunnerTaskState state, + List allTasks + ) + { + //divide active tasks into 3 lists : running, pending, waiting + Optional taskRunnerOpt = taskMaster.getTaskRunner(); + if (!taskRunnerOpt.isPresent()) { + throw new WebApplicationException( + Response.serverError().entity("No task runner found").build() + ); + } + TaskRunner runner = taskRunnerOpt.get(); + // the order of tasks below is waiting, pending, running to prevent + // skipping a task, it's the order in which tasks will change state + // if they do while this is code is executing, so a task might be + // counted twice but never skipped + if (RunnerTaskState.WAITING.equals(state)) { + Collection runnersKnownTasks = runner.getKnownTasks(); + Set runnerKnownTaskIds = runnersKnownTasks + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toSet()); + final List waitingTasks = new ArrayList<>(); + for (TaskRunnerWorkItem task : allTasks) { + if (!runnerKnownTaskIds.contains(task.getTaskId())) { + waitingTasks.add(((AnyTask) task).withTaskState( + TaskState.RUNNING, + RunnerTaskState.WAITING, + task.getCreatedTime(), + task.getQueueInsertionTime(), + task.getLocation() + )); + } + } + return waitingTasks; + } + + if (RunnerTaskState.PENDING.equals(state)) { + Collection knownPendingTasks = runner.getPendingTasks(); + Set pendingTaskIds = knownPendingTasks + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toSet()); + Map workItemIdMap = knownPendingTasks + .stream() + .collect(Collectors.toMap( + TaskRunnerWorkItem::getTaskId, + java.util.function.Function.identity(), + (previousWorkItem, newWorkItem) -> newWorkItem + )); + final List pendingTasks = new ArrayList<>(); + for (TaskRunnerWorkItem task : allTasks) { + if (pendingTaskIds.contains(task.getTaskId())) { + pendingTasks.add(((AnyTask) task).withTaskState( + TaskState.RUNNING, + RunnerTaskState.PENDING, + workItemIdMap.get(task.getTaskId()).getCreatedTime(), + workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(), + workItemIdMap.get(task.getTaskId()).getLocation() + )); + } + } + return pendingTasks; + } + + if (RunnerTaskState.RUNNING.equals(state)) { + Collection knownRunningTasks = runner.getRunningTasks(); + Set runningTaskIds = knownRunningTasks + .stream() + .map(TaskRunnerWorkItem::getTaskId) + .collect(Collectors.toSet()); + Map workItemIdMap = knownRunningTasks + .stream() + .collect(Collectors.toMap( + TaskRunnerWorkItem::getTaskId, + java.util.function.Function.identity(), + (previousWorkItem, newWorkItem) -> newWorkItem + )); + final List runningTasks = new ArrayList<>(); + for (TaskRunnerWorkItem task : allTasks) { + if (runningTaskIds.contains(task.getTaskId())) { + runningTasks.add(((AnyTask) task).withTaskState( + TaskState.RUNNING, + RunnerTaskState.RUNNING, + workItemIdMap.get(task.getTaskId()).getCreatedTime(), + workItemIdMap.get(task.getTaskId()).getQueueInsertionTime(), + workItemIdMap.get(task.getTaskId()).getLocation() + )); + } + } + return runningTasks; + } + return allTasks; + } + private List securedTaskStatusPlus( List collectionToFilter, @Nullable String dataSource, @@ -1057,4 +944,98 @@ private List securedTaskStatusPlus( ) ); } + + private static class AnyTask extends TaskRunnerWorkItem + { + private final String taskType; + private final String dataSource; + private final TaskState taskState; + private final RunnerTaskState runnerTaskState; + private final DateTime createdTime; + private final DateTime queueInsertionTime; + private final TaskLocation taskLocation; + + AnyTask( + String taskId, + String taskType, + ListenableFuture result, + String dataSource, + TaskState state, + RunnerTaskState runnerState, + DateTime createdTime, + DateTime queueInsertionTime, + TaskLocation taskLocation + ) + { + super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH); + this.taskType = taskType; + this.dataSource = dataSource; + this.taskState = state; + this.runnerTaskState = runnerState; + this.createdTime = createdTime; + this.queueInsertionTime = queueInsertionTime; + this.taskLocation = taskLocation; + } + + @Override + public TaskLocation getLocation() + { + return taskLocation; + } + + @Override + public String getTaskType() + { + return taskType; + } + + @Override + public String getDataSource() + { + return dataSource; + } + + public TaskState getTaskState() + { + return taskState; + } + + public RunnerTaskState getRunnerTaskState() + { + return runnerTaskState; + } + + @Override + public DateTime getCreatedTime() + { + return createdTime; + } + + @Override + public DateTime getQueueInsertionTime() + { + return queueInsertionTime; + } + + public AnyTask withTaskState( + TaskState newTaskState, + RunnerTaskState runnerState, + DateTime createdTime, + DateTime queueInsertionTime, + TaskLocation taskLocation + ) + { + return new AnyTask( + getTaskId(), + getTaskType(), + getResult(), + getDataSource(), + newTaskState, + runnerState, + createdTime, + queueInsertionTime, + taskLocation + ); + } + } } From 2f4f6144ffc624a612289b89ac110217f6348c8f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 12 Feb 2019 18:42:52 -0800 Subject: [PATCH 5/8] fix it --- core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index 5571eb95d6c5..34733af08bb8 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.java.util.common.RE; -import org.apache.druid.java.util.common.logger.Logger; import org.joda.time.DateTime; import javax.annotation.Nullable; From 11aa5aa3be86c84002ec4b26f000e0eeca404bf9 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 12 Feb 2019 19:06:46 -0800 Subject: [PATCH 6/8] this will fix it --- .../apache/druid/indexing/overlord/http/OverlordResource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 8e901db0a27d..e5abac4070e7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -101,7 +101,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiFunction; import java.util.stream.Collectors; /** From 07e008a0ea0d73a9da93b8d7279d63514b2fa9c8 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Wed, 13 Feb 2019 12:35:59 -0800 Subject: [PATCH 7/8] fixup --- docs/content/operations/api-reference.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md index 2d9db69cc7c7..583c9e842942 100644 --- a/docs/content/operations/api-reference.md +++ b/docs/content/operations/api-reference.md @@ -405,7 +405,7 @@ Retrieve list of tasks. Accepts query string parameters `state`, `datasource`, ` | `datasource`| return tasks filtered by Druid datasource.| | `createdTimeInterval`| return tasks created within the specified interval. | | `max`| maximum number of `"complete"` tasks to return. Only applies when `state` is set to `"complete"`.| -| `type`| filter tasks by task type.| +| `type`| filter tasks by task type. See [task documentation](../ingestion/tasks.html) for more details.| * `/druid/indexer/v1/completeTasks` @@ -414,15 +414,15 @@ Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=co * `/druid/indexer/v1/runningTasks` -Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=running`. +Retrieve list of running tasks. Equivalent to `/druid/indexer/v1/tasks?state=running`. * `/druid/indexer/v1/waitingTasks` -Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=waiting`. +Retrieve list of waiting tasks. Equivalent to `/druid/indexer/v1/tasks?state=waiting`. * `/druid/indexer/v1/pendingTasks` -Retrieve list of complete tasks. Equivalent to `/druid/indexer/v1/tasks?state=pending`. +Retrieve list of pending tasks. Equivalent to `/druid/indexer/v1/tasks?state=pending`. * `/druid/indexer/v1/task/{taskId}` From 3675a9e84593036693aae09f45df23f091225097 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 15 Feb 2019 16:12:32 -0800 Subject: [PATCH 8/8] link to other docs --- docs/content/operations/api-reference.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/content/operations/api-reference.md b/docs/content/operations/api-reference.md index 583c9e842942..434f7fa339ec 100644 --- a/docs/content/operations/api-reference.md +++ b/docs/content/operations/api-reference.md @@ -466,8 +466,10 @@ Retrieve list of task status objects for list of task id strings in request body * `/druid/indexer/v1/pendingSegments/{dataSource}` -Cleanup pending segments table in metadata storage for `datasource`. Returns a JSON object response with `numDeleted` -and count of rows deleted from the pending segments table. +Manually clean up pending segments table in metadata storage for `datasource`. Returns a JSON object response with +`numDeleted` and count of rows deleted from the pending segments table. This API is used by the +`druid.coordinator.kill.pendingSegments.on` [coordinator setting](../configuration/index.html#coordinator-operation) +which automates this operation to perform periodically. #### Supervisors