diff --git a/core/src/main/java/org/apache/druid/indexer/TaskInfoLite.java b/core/src/main/java/org/apache/druid/indexer/TaskInfoLite.java new file mode 100644 index 000000000000..c41d5129d0d5 --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/TaskInfoLite.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.google.common.base.Preconditions; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; + +/** + * This class is used to store task info that is only necessary to some status queries in the OverlordResource + */ +public class TaskInfoLite +{ + private final String id; + private final String groupId; + private final String type; + private final String dataSource; + private final TaskLocation location; + private final DateTime createdTime; + private final String status; + private final Long duration; + private final @Nullable String errorMsg; + + public TaskInfoLite( + String id, + String groupId, + String type, + String dataSource, + TaskLocation location, + DateTime createdTime, + String status, + Long duration, + @Nullable String errorMsg + ) + { + this.id = Preconditions.checkNotNull(id, "id"); + this.groupId = Preconditions.checkNotNull(groupId, "groupId"); + this.type = Preconditions.checkNotNull(type, "type"); + this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); + this.location = Preconditions.checkNotNull(location, "location"); + this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime"); + this.status = Preconditions.checkNotNull(status, "status"); + this.duration = Preconditions.checkNotNull(duration, "duration"); + this.errorMsg = errorMsg; + } + + public String getId() + { + return id; + } + + public String getGroupId() + { + return groupId; + } + + public String getType() + { + return type; + } + + public String getDataSource() + { + return dataSource; + } + + public TaskLocation getLocation() + { + return location; + } + + public DateTime getCreatedTime() + { + return createdTime; + } + + public TaskState getStatus() + { + switch(status) { + case "SUCCESS": + return TaskState.SUCCESS; + case "FAILED": + return TaskState.FAILED; + case "RUNNING": + default: + return TaskState.RUNNING; + } + } + + public Long getDuration() + { + return duration; + } + + public String getErrorMsg() { + return errorMsg; + } +} \ No newline at end of file diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index d25829cd0bc0..abdfd16131d7 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -21,10 +21,12 @@ import com.google.common.base.Optional; import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.indexer.TaskInfoLite; import org.joda.time.DateTime; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -81,6 +83,11 @@ void insert( @Nullable TaskInfo getTaskInfo(String entryId); + @Nullable + default TaskInfoLite getTaskInfoLite(String entryId) { + return null; + } + /** * Return up to {@code maxNumStatuses} {@link TaskInfo} objects for all inactive entries * created on or later than the given timestamp @@ -96,6 +103,14 @@ List> getCompletedTaskInfo( @Nullable String datasource ); + default List getCompletedTaskInfoLite( + DateTime timestamp, + @Nullable Integer maxNumStatuses, + @Nullable String datasource + ) { + return Collections.emptyList(); + } + /** * Return {@link TaskInfo} objects for all active entries * @@ -103,6 +118,10 @@ List> getCompletedTaskInfo( */ List> getActiveTaskInfo(@Nullable String dataSource); + default List getActiveTaskInfoLite(@Nullable String dataSource) { + return Collections.emptyList(); + } + /** * Add a lock to the given entry * diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java index 95542cf34026..4655f2b9e3f9 100644 --- a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java +++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java @@ -29,6 +29,7 @@ import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; +import org.eclipse.jetty.util.ajax.JSON; import org.postgresql.PGProperty; import org.postgresql.util.PSQLException; import org.skife.jdbi.v2.DBI; @@ -45,6 +46,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector { private static final Logger log = new Logger(PostgreSQLConnector.class); private static final String PAYLOAD_TYPE = "BYTEA"; + private static final String JSON_PAYLOAD_TYPE = "JSONB"; private static final String SERIAL_TYPE = "BIGSERIAL"; private static final String QUOTE_STRING = "\\\""; private static final String PSQL_SERIALIZATION_FAILURE_MSG = @@ -126,6 +128,12 @@ public String getPayloadType() return PAYLOAD_TYPE; } + @Override + public String getJsonPayloadType() + { + return JSON_PAYLOAD_TYPE; + } + @Override public String getSerialType() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 08c1343f80b3..64c97607798a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -27,6 +27,7 @@ import com.google.common.collect.Iterables; import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.indexer.TaskInfoLite; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskAction; @@ -186,6 +187,13 @@ public TaskInfo getTaskInfo(String taskId) return handler.getTaskInfo(taskId); } + @Nullable + @Override + public TaskInfoLite getTaskInfoLite(String taskId) + { + return handler.getTaskInfoLite(taskId); + } + @Override public List getActiveTasks() { @@ -219,6 +227,14 @@ public List> getActiveTaskInfo(@Nullable String dataS ); } + @Override + public List getActiveTaskInfoLite(@Nullable String dataSource) + { + return ImmutableList.copyOf( + handler.getActiveTaskInfoLite(dataSource) + ); + } + @Override public List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable Integer maxTaskStatuses, @@ -236,6 +252,23 @@ public List> getRecentlyCreatedAlreadyFinishedTaskInf ); } + @Override + public List getRecentlyCreatedAlreadyFinishedTaskInfoLite( + @Nullable Integer maxTaskStatuses, + @Nullable Duration durationBeforeNow, + @Nullable String datasource + ) + { + return ImmutableList.copyOf( + handler.getCompletedTaskInfoLite( + DateTimes.nowUtc() + .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow), + maxTaskStatuses, + datasource + ) + ); + } + @Override public void addLock(final String taskid, final TaskLock taskLock) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index 16810aa2529c..354c9f1e69ca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -21,6 +21,7 @@ import com.google.common.base.Optional; import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.indexer.TaskInfoLite; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskAction; @@ -29,6 +30,7 @@ import org.joda.time.Duration; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; public interface TaskStorage @@ -109,6 +111,11 @@ public interface TaskStorage @Nullable TaskInfo getTaskInfo(String taskId); + @Nullable + default TaskInfoLite getTaskInfoLite(String taskId) { + return null; + } + /** * Add an action taken by a task to the audit log. * @@ -155,6 +162,10 @@ public interface TaskStorage */ List> getActiveTaskInfo(@Nullable String dataSource); + default List getActiveTaskInfoLite(@Nullable String dataSource) { + return Collections.emptyList(); + } + /** * Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage * facility. No particular order is guaranteed, but implementations are encouraged to return tasks in descending order @@ -173,6 +184,14 @@ List> getRecentlyCreatedAlreadyFinishedTaskInfo( @Nullable String datasource ); + default List getRecentlyCreatedAlreadyFinishedTaskInfoLite( + @Nullable Integer maxTaskStatuses, + @Nullable Duration durationBeforeNow, + @Nullable String datasource + ) { + return Collections.emptyList(); + } + /** * Returns a list of locks for a particular task. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index 4ccd9251b43d..79e8078805af 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -22,6 +22,7 @@ import com.google.common.base.Optional; import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.indexer.TaskInfoLite; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; @@ -78,6 +79,19 @@ public List> getActiveTaskInfo(@Nullable String dataS return storage.getActiveTaskInfo(dataSource); } + public List getActiveTaskInfoLite(@Nullable String dataSource) + { + return storage.getActiveTaskInfoLite(dataSource); + } + public List getCompletedTaskInfoByCreatedTimeDurationLite( + @Nullable Integer maxTaskStatuses, + @Nullable Duration duration, + @Nullable String dataSource + ) + { + return storage.getRecentlyCreatedAlreadyFinishedTaskInfoLite(maxTaskStatuses, duration, dataSource); + } + public List> getCompletedTaskInfoByCreatedTimeDuration( @Nullable Integer maxTaskStatuses, @Nullable Duration duration, @@ -103,6 +117,12 @@ public TaskInfo getTaskInfo(String taskId) return storage.getTaskInfo(taskId); } + @Nullable + public TaskInfoLite getTaskInfoLite(String taskId) + { + return storage.getTaskInfoLite(taskId); + } + /** * Returns all segments created by this task. * diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 0076eb0335b8..34280a950636 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -38,6 +38,7 @@ import org.apache.druid.common.config.JacksonConfigManager; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.indexer.TaskInfoLite; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -263,7 +264,84 @@ public Response getTaskPayload(@PathParam("taskid") String taskid) @Path("/task/{taskid}/status") @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(TaskResourceFilter.class) - public Response getTaskStatus(@PathParam("taskid") String taskid) + public Response getTaskStatus( + @PathParam("taskid") String taskid, + @QueryParam("lite") final Boolean isLite + ) + { + return isLite != null && isLite + ? getTaskStatusLite(taskid) + : getTaskStatus(taskid); + } + + protected Response getTaskStatusLite(String taskid) + { + final TaskInfoLite taskInfo = taskStorageQueryAdapter.getTaskInfoLite(taskid); + TaskStatusResponse response = null; + + if (taskInfo != null) { + if (taskMaster.getTaskRunner().isPresent()) { + final TaskRunner taskRunner = taskMaster.getTaskRunner().get(); + final TaskRunnerWorkItem workItem = taskRunner + .getKnownTasks() + .stream() + .filter(item -> item.getTaskId().equals(taskid)) + .findAny() + .orElse(null); + if (workItem != null) { + response = new TaskStatusResponse( + workItem.getTaskId(), + new TaskStatusPlus( + taskInfo.getId(), + taskInfo.getGroupId(), + taskInfo.getType(), + taskInfo.getCreatedTime(), + // Would be nice to include the real queue insertion time, but the + // TaskStorage API doesn't yet allow it. + DateTimes.EPOCH, + taskInfo.getStatus(), + taskRunner.getRunnerTaskState(workItem.getTaskId()), + taskInfo.getDuration(), + workItem.getLocation(), + taskInfo.getDataSource(), + taskInfo.getErrorMsg() + ) + ); + } + } + + if (response == null) { + response = new TaskStatusResponse( + taskid, + new TaskStatusPlus( + taskInfo.getId(), + taskInfo.getGroupId(), + taskInfo.getType(), + taskInfo.getCreatedTime(), + // Would be nice to include the real queue insertion time, but the + // TaskStorage API doesn't yet allow it. + DateTimes.EPOCH, + taskInfo.getStatus(), + RunnerTaskState.WAITING, + taskInfo.getDuration(), + taskInfo.getLocation(), + taskInfo.getDataSource(), + taskInfo.getErrorMsg() + ) + ); + } + } else { + response = new TaskStatusResponse(taskid, null); + } + + final Response.Status status = response.getStatus() == null + ? Response.Status.NOT_FOUND + : Response.Status.OK; + + return Response.status(status).entity(response).build(); + } + + protected Response getTaskStatus(String taskid) { final TaskInfo taskInfo = taskStorageQueryAdapter.getTaskInfo(taskid); TaskStatusResponse response = null; @@ -521,7 +599,7 @@ public Response apply(TaskActionClient taskActionClient) @Produces(MediaType.APPLICATION_JSON) public Response getWaitingTasks(@Context final HttpServletRequest req) { - return getTasks("waiting", null, null, null, null, req); + return getTasks("waiting", null, null, null, null, false, req); } @GET @@ -529,7 +607,7 @@ public Response getWaitingTasks(@Context final HttpServletRequest req) @Produces(MediaType.APPLICATION_JSON) public Response getPendingTasks(@Context final HttpServletRequest req) { - return getTasks("pending", null, null, null, null, req); + return getTasks("pending", null, null, null, null, false, req); } @GET @@ -540,7 +618,7 @@ public Response getRunningTasks( @Context final HttpServletRequest req ) { - return getTasks("running", null, null, null, taskType, req); + return getTasks("running", null, null, null, taskType, false, req); } @GET @@ -551,7 +629,7 @@ public Response getCompleteTasks( @Context final HttpServletRequest req ) { - return getTasks("complete", null, null, maxTaskStatuses, null, req); + return getTasks("complete", null, null, maxTaskStatuses, null, false, req); } @GET @@ -563,6 +641,7 @@ public Response getTasks( @QueryParam("createdTimeInterval") final String createdTimeInterval, @QueryParam("max") final Integer maxCompletedTasks, @QueryParam("type") final String type, + @QueryParam("lite") final Boolean isLite, @Context final HttpServletRequest req ) { @@ -594,6 +673,23 @@ public Response getTasks( ); } } + + if (isLite != null && isLite) { + return getTasksLite(state, dataSource, createdTimeInterval, maxCompletedTasks, type, req); + } else { + return getTasks(state, dataSource, createdTimeInterval, maxCompletedTasks, type, req); + } + } + + public Response getTasks( + final String state, + final String dataSource, + final String createdTimeInterval, + final Integer maxCompletedTasks, + final String type, + final HttpServletRequest req + ) + { List finalTaskList = new ArrayList<>(); Function activeTaskTransformFunc = workItem -> new TaskStatusPlus( workItem.getTaskId(), @@ -690,6 +786,111 @@ public Response getTasks( return Response.ok(authorizedList).build(); } + public Response getTasksLite( + final String state, + final String dataSource, + final String createdTimeInterval, + final Integer maxCompletedTasks, + final String type, + final HttpServletRequest req + ) + { + List finalTaskList = new ArrayList<>(); + Function activeTaskTransformFunc = workItem -> new TaskStatusPlus( + workItem.getTaskId(), + workItem.getTaskGroupId(), + workItem.getTaskType(), + workItem.getCreatedTime(), + workItem.getQueueInsertionTime(), + workItem.getTaskState(), + workItem.getRunnerTaskState(), + null, + workItem.getLocation(), + workItem.getDataSource(), + null + ); + + Function completeTaskTransformFunc = taskInfo -> new TaskStatusPlus( + taskInfo.getId(), + taskInfo.getGroupId(), + taskInfo.getType(), + taskInfo.getCreatedTime(), + // Would be nice to include the real queue insertion time, but the + // TaskStorage API doesn't yet allow it. + DateTimes.EPOCH, + taskInfo.getStatus(), + RunnerTaskState.NONE, + taskInfo.getDuration(), + taskInfo.getLocation() == null ? TaskLocation.unknown() : taskInfo.getLocation(), + taskInfo.getDataSource(), + taskInfo.getErrorMsg() + ); + + //checking for complete tasks first to avoid querying active tasks if user only wants complete tasks + if (state == null || "complete".equals(StringUtils.toLowerCase(state))) { + Duration createdTimeDuration = null; + if (createdTimeInterval != null) { + final Interval theInterval = Intervals.of(StringUtils.replace(createdTimeInterval, "_", "/")); + createdTimeDuration = theInterval.toDuration(); + } + final List taskInfoList = + taskStorageQueryAdapter.getCompletedTaskInfoByCreatedTimeDurationLite(maxCompletedTasks, createdTimeDuration, dataSource); + final List completedTasks = taskInfoList.stream() + .map(completeTaskTransformFunc::apply) + .collect(Collectors.toList()); + finalTaskList.addAll(completedTasks); + } + + final List allActiveTaskInfo; + final List allActiveTasks = new ArrayList<>(); + if (state == null || !"complete".equals(StringUtils.toLowerCase(state))) { + allActiveTaskInfo = taskStorageQueryAdapter.getActiveTaskInfoLite(dataSource); + allActiveTaskInfo.forEach(task -> + allActiveTasks.add( + new AnyTask( + task.getId(), + task.getGroupId(), + task.getType(), + SettableFuture.create(), + task.getDataSource(), + null, + null, + task.getCreatedTime(), + DateTimes.EPOCH, + TaskLocation.unknown() + ))); + } + + if (state == null || "waiting".equals(StringUtils.toLowerCase(state))) { + final List waitingWorkItems = filterActiveTasks(RunnerTaskState.WAITING, allActiveTasks); + List transformedWaitingList = waitingWorkItems.stream() + .map(activeTaskTransformFunc::apply) + .collect(Collectors.toList()); + finalTaskList.addAll(transformedWaitingList); + } + if (state == null || "pending".equals(StringUtils.toLowerCase(state))) { + final List pendingWorkItems = filterActiveTasks(RunnerTaskState.PENDING, allActiveTasks); + List transformedPendingList = pendingWorkItems.stream() + .map(activeTaskTransformFunc::apply) + .collect(Collectors.toList()); + finalTaskList.addAll(transformedPendingList); + } + if (state == null || "running".equals(StringUtils.toLowerCase(state))) { + final List runningWorkItems = filterActiveTasks(RunnerTaskState.RUNNING, allActiveTasks); + List transformedRunningList = runningWorkItems.stream() + .map(activeTaskTransformFunc::apply) + .collect(Collectors.toList()); + finalTaskList.addAll(transformedRunningList); + } + final List authorizedList = securedTaskStatusPlus( + finalTaskList, + dataSource, + type, + req + ); + return Response.ok(authorizedList).build(); + } + @DELETE @Path("/pendingSegments/{dataSource}") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index c4751669b272..8b92577c9605 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -447,7 +447,7 @@ public void testGetTasks() workerTaskRunnerQueryAdapter ); List responseObjects = (List) overlordResource - .getTasks(null, null, null, null, null, req) + .getTasks(null, null, null, null, null, null, req) .getEntity(); Assert.assertEquals(4, responseObjects.size()); } @@ -543,7 +543,7 @@ public void testGetTasksFilterDataSource() ); List responseObjects = (List) overlordResource - .getTasks(null, "allow", null, null, null, req) + .getTasks(null, "allow", null, null, null, null, req) .getEntity(); Assert.assertEquals(7, responseObjects.size()); Assert.assertEquals("id_5", responseObjects.get(0).getId()); @@ -610,6 +610,7 @@ public void testGetTasksFilterWaitingState() null, null, null, + null, req ).getEntity(); Assert.assertEquals(1, responseObjects.size()); @@ -672,7 +673,7 @@ public void testGetTasksFilterRunningState() ); List responseObjects = (List) overlordResource - .getTasks("running", "allow", null, null, null, req) + .getTasks("running", "allow", null, null, null, null, req) .getEntity(); Assert.assertEquals(2, responseObjects.size()); @@ -737,7 +738,7 @@ public void testGetTasksFilterPendingState() ); List responseObjects = (List) overlordResource - .getTasks("pending", null, null, null, null, req) + .getTasks("pending", null, null, null, null, null, req) .getEntity(); Assert.assertEquals(1, responseObjects.size()); @@ -784,7 +785,7 @@ public void testGetTasksFilterCompleteState() workerTaskRunnerQueryAdapter ); List responseObjects = (List) overlordResource - .getTasks("complete", null, null, null, null, req) + .getTasks("complete", null, null, null, null, null, req) .getEntity(); Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals("id_1", responseObjects.get(0).getId()); @@ -834,7 +835,7 @@ public void testGetTasksFilterCompleteStateWithInterval() ); String interval = "2010-01-01_P1D"; List responseObjects = (List) overlordResource - .getTasks("complete", null, interval, null, null, req) + .getTasks("complete", null, interval, null, null, null, req) .getEntity(); Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals("id_2", responseObjects.get(0).getId()); @@ -879,7 +880,7 @@ public void testGetNullCompleteTask() workerTaskRunnerQueryAdapter ); List responseObjects = (List) overlordResource - .getTasks("complete", null, null, null, null, req) + .getTasks("complete", null, null, null, null, null, req) .getEntity(); Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals("id_1", responseObjects.get(0).getId()); @@ -900,7 +901,7 @@ public void testGetTasksNegativeState() workerTaskRunnerQueryAdapter ); Object responseObject = overlordResource - .getTasks("blah", "ds_test", null, null, null, req) + .getTasks("blah", "ds_test", null, null, null, null, req) .getEntity(); Assert.assertEquals( "Invalid state : blah, valid values are: [pending, waiting, running, complete]", diff --git a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java index e967fb8f09e3..415a1a8f4763 100644 --- a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java @@ -20,17 +20,32 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.indexer.TaskInfoLite; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Query; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.ResultSetMapper; import javax.annotation.Nullable; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; import java.util.Map; public class PostgreSQLMetadataStorageActionHandler extends SQLMetadataStorageActionHandler { + private static final EmittingLogger log = new EmittingLogger(PostgreSQLMetadataStorageActionHandler.class); + + private final TaskInfoLiteMapper taskInfoMapper; + public PostgreSQLMetadataStorageActionHandler( SQLMetadataConnector connector, ObjectMapper jsonMapper, @@ -42,6 +57,7 @@ public PostgreSQLMetadataStorageActionHandler( ) { super(connector, jsonMapper, types, entryTypeName, entryTable, logTable, lockTable); + this.taskInfoMapper = new TaskInfoLiteMapper(jsonMapper); } @Override @@ -52,20 +68,39 @@ protected Query> createCompletedTaskInfoQuery( @Nullable String dataSource ) { - String sql = StringUtils.format( - "SELECT " - + " id, " - + " status_payload, " - + " created_date, " - + " datasource, " - + " payload " - + "FROM " - + " %s " - + "WHERE " - + getWhereClauseForInactiveStatusesSinceQuery(dataSource) - + "ORDER BY created_date DESC", - getEntryTable() - ); + return createCompletedTaskInfoQuery(handle, timestamp, maxNumStatuses, dataSource, false); + } + + private Query> createCompletedTaskInfoQuery( + Handle handle, + DateTime timestamp, + @Nullable Integer maxNumStatuses, + @Nullable String dataSource, + @Nullable Boolean isLite + ) + { + String sql; + if (isLite != null && isLite) { + sql = createTaskInfoLiteQuery( + getWhereClauseForInactiveStatusesSinceQuery(dataSource) + + "ORDER BY created_date DESC" + ); + } else { + sql = StringUtils.format( + "SELECT " + + " id, " + + " status_payload, " + + " created_date, " + + " datasource, " + + " payload " + + "FROM " + + " %s " + + "WHERE " + + getWhereClauseForInactiveStatusesSinceQuery(dataSource) + + "ORDER BY created_date DESC", + getEntryTable() + ); + } if (maxNumStatuses != null) { sql += " LIMIT :n"; @@ -81,6 +116,7 @@ protected Query> createCompletedTaskInfoQuery( } return query; } + private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String datasource) { String sql = StringUtils.format("active = FALSE AND created_date >= :start "); @@ -90,6 +126,73 @@ private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String data return sql; } + @Override + @Nullable + public TaskInfoLite getTaskInfoLite(String entryId) + { + return getConnector().retryWithHandle(handle -> handle.createQuery( + createTaskInfoLiteQuery("id = :id")) + .bind("id", entryId) + .map(taskInfoMapper) + .first() + ); + } + + @Override + public List getActiveTaskInfoLite(@Nullable String dataSource) + { + return getConnector().retryWithHandle( + handle -> { + Query> query = handle.createQuery( + createTaskInfoLiteQuery(getWhereClauseForActiveStatusesQuery(dataSource))); + if (dataSource != null) { + query = query.bind("ds", dataSource); + } + return query.map(taskInfoMapper).list(); + }); + } + + public List getCompletedTaskInfoLite( + DateTime timestamp, + @Nullable Integer maxNumStatuses, + @Nullable String dataSource + ) + { + return getConnector().retryWithHandle( + handle -> { + final Query> query = createCompletedTaskInfoQuery( + handle, + timestamp, + maxNumStatuses, + dataSource, + true + ); + return query.map(taskInfoMapper).list(); + } + ); + } + + private String createTaskInfoLiteQuery(@Nullable String whereClause) + { + String query = StringUtils.format( + "SELECT " + + " id, " + + " payload_json->>'groupId' AS group_id, " + + " payload_json->>'type' AS type, datasource, " + + " status_payload_json->>'location' AS location, " + + " created_date, " + + " status_payload_json->>'status' AS status, " + + " status_payload_json->>'duration' AS duration, " + + " status_payload_json->>'errorMsg' AS error_msg " + + "FROM " + + " %s " + + (whereClause != null ? "WHERE " + whereClause : ""), + getEntryTable() + ); + log.info("Executing: '%s'", query); + return query; + } + @Deprecated @Override public String getSqlRemoveLogsOlderThan() @@ -99,4 +202,40 @@ public String getSqlRemoveLogsOlderThan() getLogTable(), getEntryTable(), getEntryTypeName(), getEntryTable()); } + static class TaskInfoLiteMapper implements ResultSetMapper + { + private final ObjectMapper objectMapper; + + TaskInfoLiteMapper(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public TaskInfoLite map(int index, ResultSet resultSet, StatementContext context) + throws SQLException + { + final TaskInfoLite taskInfo; + TaskLocation location; + try { + location = objectMapper.readValue(resultSet.getString("location"), TaskLocation.class); + } + catch (IOException e) { + log.warn("Encountered exception[%s] while deserializing location from task status, setting location to unknown", e.getMessage()); + location = TaskLocation.unknown(); + } + taskInfo = new TaskInfoLite( + resultSet.getString("id"), + resultSet.getString("group_id"), + resultSet.getString("type"), + resultSet.getString("datasource"), + location, + DateTimes.of(resultSet.getString("created_date")), + resultSet.getString("status"), + resultSet.getLong("duration"), + resultSet.getString("error_msg") + ); + return taskInfo; + } + } } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index c41ca075fa3f..0e4d5c608d4e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -56,6 +56,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector { private static final Logger log = new Logger(SQLMetadataConnector.class); private static final String PAYLOAD_TYPE = "BLOB"; + private static final String JSON_PAYLOAD_TYPE = "CLOB"; private static final String COLLATION = ""; static final int DEFAULT_MAX_TRIES = 10; @@ -88,6 +89,11 @@ public String getPayloadType() return PAYLOAD_TYPE; } + public String getJsonPayloadType() + { + return JSON_PAYLOAD_TYPE; + } + /** * The character set and collation for case-sensitive nonbinary string comparison * @@ -334,13 +340,15 @@ public void createEntryTable(final String tableName) "CREATE TABLE %1$s (\n" + " id VARCHAR(255) NOT NULL,\n" + " created_date VARCHAR(255) NOT NULL,\n" - + " datasource VARCHAR(255) %3$s NOT NULL,\n" + + " datasource VARCHAR(255) %4$s NOT NULL,\n" + " payload %2$s NOT NULL,\n" + + " payload_json %3$s NOT NULL,\n" + " status_payload %2$s NOT NULL,\n" + + " status_payload_json %3$s NOT NULL,\n" + " active BOOLEAN NOT NULL DEFAULT FALSE,\n" + " PRIMARY KEY (id)\n" + ")", - tableName, getPayloadType(), getCollation() + tableName, getPayloadType(), getJsonPayloadType(), getCollation() ), StringUtils.format("CREATE INDEX idx_%1$s_active_created_date ON %1$s(active, created_date)", tableName) ) diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 7f30d0405bbd..b0c8f5fca967 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -146,8 +146,10 @@ public void insert( getConnector().retryWithHandle( (HandleCallback) handle -> { final String sql = StringUtils.format( - "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) " - + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", + "INSERT INTO %s (id, created_date, datasource, payload, payload_json, active, status_payload, " + + "status_payload_json) " + + "VALUES (:id, :created_date, :datasource, :payload, CAST(:payload_json as jsonb), :active, " + + ":status_payload, CAST(:status_payload_json as jsonb))", getEntryTable() ); handle.createStatement(sql) @@ -155,8 +157,10 @@ public void insert( .bind("created_date", timestamp.toString()) .bind("datasource", dataSource) .bind("payload", jsonMapper.writeValueAsBytes(entry)) + .bind("payload_json", jsonMapper.writeValueAsString(entry)) .bind("active", active) .bind("status_payload", jsonMapper.writeValueAsBytes(status)) + .bind("status_payload_json", jsonMapper.writeValueAsString(status)) .execute(); return null; }, @@ -326,7 +330,7 @@ private Query> createActiveTaskInfoQuery(Handle handle, @Nul return query; } - private String getWhereClauseForActiveStatusesQuery(String dataSource) + protected String getWhereClauseForActiveStatusesQuery(String dataSource) { String sql = StringUtils.format("active = TRUE "); if (dataSource != null) {