diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdentifier.java b/core/src/main/java/org/apache/druid/indexer/TaskIdentifier.java new file mode 100644 index 000000000000..753de56328bf --- /dev/null +++ b/core/src/main/java/org/apache/druid/indexer/TaskIdentifier.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexer; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import javax.annotation.Nullable; +import java.util.Objects; + +/** + * Model class containing the id, type and groupId of a task + * These fields are extracted from the task payload for the new schema and this model can be used for migration as well. + */ +public class TaskIdentifier +{ + + private final String id; + + @Nullable + private final String type; + + @Nullable + private final String groupId; + + @JsonCreator + public TaskIdentifier( + @JsonProperty("id") String id, + @JsonProperty("groupId") @Nullable String groupId, + @JsonProperty("type") @Nullable String type // nullable for backward compatibility + ) + { + this.id = Preconditions.checkNotNull(id, "id"); + this.groupId = groupId; + this.type = type; + } + + @JsonProperty + public String getId() + { + return id; + } + + @Nullable + @JsonProperty + public String getGroupId() + { + return groupId; + } + + @Nullable + @JsonProperty + public String getType() + { + return type; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskIdentifier that = (TaskIdentifier) o; + return Objects.equals(getId(), that.getId()) && + Objects.equals(getGroupId(), that.getGroupId()) && + Objects.equals(getType(), that.getType()); + } + + @Override + public int hashCode() + { + return Objects.hash( + getId(), + getGroupId(), + getType() + ); + } + + @Override + public String toString() + { + return "TaskIdentifier{" + + "id='" + id + '\'' + + ", groupId='" + groupId + '\'' + + ", type='" + type + '\'' + + '}'; + } +} diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java index 75bbefa0eaf3..af04db211138 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.RE; import org.joda.time.DateTime; @@ -31,6 +32,7 @@ public class TaskStatusPlus { private final String id; + @Nullable private final String type; private final DateTime createdTime; private final DateTime queueInsertionTime; @@ -252,4 +254,29 @@ public String toString() ", errorMsg='" + errorMsg + '\'' + '}'; } + + /** + * Convert a TaskInfo class of TaskIdentifier and TaskStatus to a TaskStatusPlus + * Applicable only for completed or waiting tasks since a TaskInfo doesn't have the exhaustive info for running tasks + * + * @param taskIdentifierInfo TaskInfo pair + * @return corresponding TaskStatusPlus + */ + public static TaskStatusPlus fromTaskIdentifierInfo(TaskInfo taskIdentifierInfo) + { + TaskStatus status = taskIdentifierInfo.getStatus(); + return new TaskStatusPlus( + taskIdentifierInfo.getId(), + taskIdentifierInfo.getTask().getGroupId(), + taskIdentifierInfo.getTask().getType(), + taskIdentifierInfo.getCreatedTime(), + DateTimes.EPOCH, + status.getStatusCode(), + status.getStatusCode().isComplete() ? RunnerTaskState.NONE : RunnerTaskState.WAITING, + status.getDuration(), + status.getLocation(), + taskIdentifierInfo.getDataSource(), + status.getErrorMsg() + ); + } } diff --git a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index b4d50aaa3d2e..ca6ede6d32f0 100644 --- a/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/core/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -20,6 +20,7 @@ package org.apache.druid.metadata; import com.google.common.base.Optional; +import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.DateTime; @@ -41,6 +42,8 @@ public interface MetadataStorageActionHandler> getTaskInfos( @Nullable String datasource ); + /** + * Returns the statuses of the specified tasks. + * + * If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store. + * If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata + * store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied. + * All lookups should be processed atomically if more than one lookup is given. + * + * @param taskLookups task lookup type and filters. + * @param datasource datasource filter + */ + List> getTaskStatusList( + Map taskLookups, + @Nullable String datasource + ); + default List> getTaskInfos( TaskLookup taskLookup, @Nullable String datasource @@ -173,4 +193,10 @@ default List> getTaskInfos( */ @Nullable Long getLockId(String entryId, LockType lock); + + /** + * Utility to migrate existing tasks to the new schema by populating type and groupId asynchronously + */ + void populateTaskTypeAndGroupIdAsync(); + } diff --git a/extensions-contrib/sqlserver-metadata-storage/src/main/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnector.java b/extensions-contrib/sqlserver-metadata-storage/src/main/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnector.java index d44e3489ae82..523214502e33 100644 --- a/extensions-contrib/sqlserver-metadata-storage/src/main/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnector.java +++ b/extensions-contrib/sqlserver-metadata-storage/src/main/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnector.java @@ -40,6 +40,7 @@ import java.sql.SQLException; import java.util.Arrays; import java.util.HashSet; +import java.util.Locale; import java.util.Set; import java.util.regex.Pattern; @@ -212,6 +213,12 @@ public boolean tableExists(final Handle handle, final String tableName) .isEmpty(); } + @Override + public String limitClause(int limit) + { + return String.format(Locale.ENGLISH, "FETCH NEXT %d ROWS ONLY", limit); + } + /** * * {@inheritDoc} diff --git a/extensions-contrib/sqlserver-metadata-storage/src/test/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java b/extensions-contrib/sqlserver-metadata-storage/src/test/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java index d061b99701de..a31159e88134 100644 --- a/extensions-contrib/sqlserver-metadata-storage/src/test/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java +++ b/extensions-contrib/sqlserver-metadata-storage/src/test/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java @@ -64,4 +64,15 @@ public void testIsTransientException() Assert.assertFalse(connector.isTransientException(new Throwable("Throwable with reason only"))); } + @Test + public void testLimitClause() + { + SQLServerConnector connector = new SQLServerConnector( + Suppliers.ofInstance(new MetadataStorageConnectorConfig()), + Suppliers.ofInstance( + new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null) + ) + ); + Assert.assertEquals("FETCH NEXT 100 ROWS ONLY", connector.limitClause(100)); + } } diff --git a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java index d4eb57bb8189..46e103312451 100644 --- a/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java +++ b/extensions-core/mysql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/mysql/MySQLConnector.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import java.io.File; import java.sql.SQLException; +import java.util.Locale; public class MySQLConnector extends SQLMetadataConnector { @@ -177,6 +178,12 @@ public int getStreamingFetchSize() return Integer.MIN_VALUE; } + @Override + public String limitClause(int limit) + { + return String.format(Locale.ENGLISH, "LIMIT %d", limit); + } + @Override public boolean tableExists(Handle handle, String tableName) { diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java index fceefe6e4931..23ce46282232 100644 --- a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java +++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java @@ -91,4 +91,16 @@ public void testIsExceptionTransientNoMySqlClazz() connector.connectorIsTransientException(new SQLTransientConnectionException("transient")) ); } + + @Test + public void testLimitClause() + { + MySQLConnector connector = new MySQLConnector( + CONNECTOR_CONFIG_SUPPLIER, + TABLES_CONFIG_SUPPLIER, + new MySQLConnectorSslConfig(), + MYSQL_DRIVER_CONFIG + ); + Assert.assertEquals("LIMIT 100", connector.limitClause(100)); + } } diff --git a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java index 95542cf34026..bdbf71bddc5a 100644 --- a/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java +++ b/extensions-core/postgresql-metadata-storage/src/main/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnector.java @@ -40,6 +40,7 @@ import java.sql.DatabaseMetaData; import java.sql.SQLException; import java.util.List; +import java.util.Locale; public class PostgreSQLConnector extends SQLMetadataConnector { @@ -144,6 +145,12 @@ public int getStreamingFetchSize() return DEFAULT_STREAMING_RESULT_SIZE; } + @Override + public String limitClause(int limit) + { + return String.format(Locale.ENGLISH, "LIMIT %d", limit); + } + protected boolean canUpsert(Handle handle) throws SQLException { if (canUpsert == null) { diff --git a/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java index bb885d71b7ff..08f3c333a1fb 100644 --- a/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java +++ b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java @@ -62,4 +62,18 @@ public void testIsTransientException() Assert.assertFalse(connector.isTransientException(new Exception("I'm not happy"))); Assert.assertFalse(connector.isTransientException(new Throwable("I give up"))); } + + @Test + public void testLimitClause() + { + PostgreSQLConnector connector = new PostgreSQLConnector( + Suppliers.ofInstance(new MetadataStorageConnectorConfig()), + Suppliers.ofInstance( + new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null) + ), + new PostgreSQLConnectorConfig(), + new PostgreSQLTablesConfig() + ); + Assert.assertEquals("LIMIT 100", connector.limitClause(100)); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index 5529aebc6423..65f75ee513b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.indexer.TaskIdentifier; +import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -241,4 +243,20 @@ default ContextValueType getContextValue(String key, ContextV final ContextValueType value = getContextValue(key); return value == null ? defaultValue : value; } + + default TaskIdentifier getMetadata() + { + return new TaskIdentifier(this.getId(), this.getGroupId(), this.getType()); + } + + static TaskInfo toTaskIdentifierInfo(TaskInfo taskInfo) + { + return new TaskInfo<>( + taskInfo.getId(), + taskInfo.getCreatedTime(), + taskInfo.getStatus(), + taskInfo.getDataSource(), + taskInfo.getTask().getMetadata() + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index de9ebb728125..8ef5970982ad 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -30,6 +30,7 @@ import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.config.TaskStorageConfig; @@ -233,6 +234,18 @@ public List> getTaskInfos( return tasks; } + @Override + public List getTaskStatusPlusList( + Map taskLookups, + @Nullable String datasource + ) + { + return getTaskInfos(taskLookups, datasource).stream() + .map(Task::toTaskIdentifierInfo) + .map(TaskStatusPlus::fromTaskIdentifierInfo) + .collect(Collectors.toList()); + } + private List> getRecentlyCreatedAlreadyFinishedTaskInfoSince( DateTime start, @Nullable Integer n, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 86597d5cfd25..7c7fbd3f53c8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -29,6 +29,7 @@ import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.config.TaskStorageConfig; @@ -58,6 +59,7 @@ public class MetadataTaskStorage implements TaskStorage { + private static final MetadataStorageActionHandlerTypes TASK_TYPES = new MetadataStorageActionHandlerTypes() { @Override @@ -115,6 +117,8 @@ public MetadataTaskStorage( public void start() { metadataStorageConnector.createTaskTables(); + // begins migration of existing tasks to new schema + handler.populateTaskTypeAndGroupIdAsync(); } @LifecycleStop @@ -144,7 +148,9 @@ public void insert(final Task task, final TaskStatus status) throws EntryExistsE task.getDataSource(), task, status.isRunnable(), - status + status, + task.getType(), + task.getGroupId() ); } catch (Exception e) { @@ -226,21 +232,44 @@ public List> getTaskInfos( @Nullable String datasource ) { - Map theTaskLookups = Maps.newHashMapWithExpectedSize(taskLookups.size()); + Map theTaskLookups = processTaskLookups(taskLookups); + return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups, datasource)); + } + + @Override + public List getTaskStatusPlusList( + Map taskLookups, + @Nullable String datasource + ) + { + Map processedTaskLookups = processTaskLookups(taskLookups); + return Collections.unmodifiableList( + handler.getTaskStatusList(processedTaskLookups, datasource) + .stream() + .map(TaskStatusPlus::fromTaskIdentifierInfo) + .collect(Collectors.toList()) + ); + } + + private Map processTaskLookups( + Map taskLookups + ) + { + Map processedTaskLookups = Maps.newHashMapWithExpectedSize(taskLookups.size()); for (Entry entry : taskLookups.entrySet()) { if (entry.getKey() == TaskLookupType.COMPLETE) { CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue(); - theTaskLookups.put( + processedTaskLookups.put( entry.getKey(), completeTaskLookup.hasTaskCreatedTimeFilter() ? completeTaskLookup : completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold()) ); } else { - theTaskLookups.put(entry.getKey(), entry.getValue()); + processedTaskLookups.put(entry.getKey(), entry.getValue()); } } - return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups, datasource)); + return processedTaskLookups; } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index cf858ebcc300..e9b0af24057f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -22,6 +22,7 @@ import com.google.common.base.Optional; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; @@ -150,6 +151,21 @@ public interface TaskStorage */ List getActiveTasksByDatasource(String datasource); + /** + * Returns the status of tasks in metadata storage as TaskStatusPlus + * No particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. + * + * The returned list can contain active tasks and complete tasks depending on the {@code taskLookups} parameter. + * See {@link TaskLookup} for more details of active and complete tasks. + * + * @param taskLookups lookup types and filters + * @param datasource datasource filter + */ + List getTaskStatusPlusList( + Map taskLookups, + @Nullable String datasource + ); + /** * Returns a list of tasks stored in the storage facility as {@link TaskInfo}. No * particular order is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index f4338c909674..3fa570ccb32d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -23,16 +23,15 @@ import com.google.inject.Inject; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; -import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.apache.druid.timeline.DataSegment; -import org.joda.time.Duration; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -85,21 +84,12 @@ public List> getActiveTaskInfo(@Nullable String dataS ); } - public List> getCompletedTaskInfoByCreatedTimeDuration( - @Nullable Integer maxTaskStatuses, - @Nullable Duration duration, - @Nullable String dataSource - ) - { - return storage.getTaskInfos(CompleteTaskLookup.of(maxTaskStatuses, duration), dataSource); - } - - public List> getTaskInfos( + public List getTaskStatusPlusList( Map taskLookups, @Nullable String dataSource ) { - return storage.getTaskInfos(taskLookups, dataSource); + return storage.getTaskStatusPlusList(taskLookups, dataSource); } public Optional getTask(final String taskid) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 638fbf2df4e9..a111ebe787ad 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -690,7 +690,7 @@ public Response getTasks( taskMaster.getTaskRunner(), taskRunner -> { final List authorizedList = securedTaskStatusPlus( - getTasks( + getTaskStatusPlusList( taskRunner, TaskStateLookup.fromString(state), dataSource, @@ -706,7 +706,7 @@ public Response getTasks( ); } - private List getTasks( + private List getTaskStatusPlusList( TaskRunner taskRunner, TaskStateLookup state, @Nullable String dataSource, @@ -729,7 +729,7 @@ private List getTasks( // This way, we can use the snapshot from taskStorage as the source of truth for the set of tasks to process // and use the snapshot from taskRunner as a reference for potential task state updates happened // after the first snapshotting. - Stream> taskInfoStreamFromTaskStorage = getTaskInfoStreamFromTaskStorage( + Stream taskStatusPlusStream = getTaskStatusPlusList( state, dataSource, createdTimeDuration, @@ -745,87 +745,57 @@ private List getTasks( if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING) { // We are interested in only those tasks which are in taskRunner. - taskInfoStreamFromTaskStorage = taskInfoStreamFromTaskStorage - .filter(info -> runnerWorkItems.containsKey(info.getId())); + taskStatusPlusStream = taskStatusPlusStream + .filter(statusPlus -> runnerWorkItems.containsKey(statusPlus.getId())); } - final List> taskInfoFromTaskStorage = taskInfoStreamFromTaskStorage - .collect(Collectors.toList()); + final List taskStatusPlusList = taskStatusPlusStream.collect(Collectors.toList()); // Separate complete and active tasks from taskStorage. // Note that taskStorage can return only either complete tasks or active tasks per TaskLookupType. - final List> completeTaskInfoFromTaskStorage = new ArrayList<>(); - final List> activeTaskInfoFromTaskStorage = new ArrayList<>(); - for (TaskInfo info : taskInfoFromTaskStorage) { - if (info.getStatus().isComplete()) { - completeTaskInfoFromTaskStorage.add(info); + final List completeTaskStatusPlusList = new ArrayList<>(); + final List activeTaskStatusPlusList = new ArrayList<>(); + for (TaskStatusPlus statusPlus : taskStatusPlusList) { + if (statusPlus.getStatusCode().isComplete()) { + completeTaskStatusPlusList.add(statusPlus); } else { - activeTaskInfoFromTaskStorage.add(info); + activeTaskStatusPlusList.add(statusPlus); } } - final List statuses = new ArrayList<>(); - completeTaskInfoFromTaskStorage.forEach(taskInfo -> statuses.add( - new TaskStatusPlus( - taskInfo.getId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), - taskInfo.getCreatedTime(), - DateTimes.EPOCH, - taskInfo.getStatus().getStatusCode(), - RunnerTaskState.NONE, - taskInfo.getStatus().getDuration(), - taskInfo.getStatus().getLocation(), - taskInfo.getDataSource(), - taskInfo.getStatus().getErrorMsg() - ) - )); + final List taskStatuses = new ArrayList<>(completeTaskStatusPlusList); - activeTaskInfoFromTaskStorage.forEach(taskInfo -> { - final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(taskInfo.getId()); + activeTaskStatusPlusList.forEach(statusPlus -> { + final TaskRunnerWorkItem runnerWorkItem = runnerWorkItems.get(statusPlus.getId()); if (runnerWorkItem == null) { // a task is assumed to be a waiting task if it exists in taskStorage but not in taskRunner. if (state == TaskStateLookup.WAITING || state == TaskStateLookup.ALL) { - statuses.add( - new TaskStatusPlus( - taskInfo.getId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), - taskInfo.getCreatedTime(), - DateTimes.EPOCH, - taskInfo.getStatus().getStatusCode(), - RunnerTaskState.WAITING, - taskInfo.getStatus().getDuration(), - taskInfo.getStatus().getLocation(), - taskInfo.getDataSource(), - taskInfo.getStatus().getErrorMsg() - ) - ); + taskStatuses.add(statusPlus); } } else { if (state == TaskStateLookup.PENDING || state == TaskStateLookup.RUNNING || state == TaskStateLookup.ALL) { - statuses.add( + taskStatuses.add( new TaskStatusPlus( - taskInfo.getId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getGroupId(), - taskInfo.getTask() == null ? null : taskInfo.getTask().getType(), + statusPlus.getId(), + statusPlus.getGroupId(), + statusPlus.getType(), runnerWorkItem.getCreatedTime(), runnerWorkItem.getQueueInsertionTime(), - taskInfo.getStatus().getStatusCode(), - taskRunner.getRunnerTaskState(taskInfo.getId()), // this is racy for remoteTaskRunner - taskInfo.getStatus().getDuration(), + statusPlus.getStatusCode(), + taskRunner.getRunnerTaskState(statusPlus.getId()), // this is racy for remoteTaskRunner + statusPlus.getDuration(), runnerWorkItem.getLocation(), // location in taskInfo is only updated after the task is done. - taskInfo.getDataSource(), - taskInfo.getStatus().getErrorMsg() + statusPlus.getDataSource(), + statusPlus.getErrorMsg() ) ); } } }); - return statuses; + return taskStatuses; } - private Stream> getTaskInfoStreamFromTaskStorage( + private Stream getTaskStatusPlusList( TaskStateLookup state, @Nullable String dataSource, Duration createdTimeDuration, @@ -861,16 +831,16 @@ private Stream> getTaskInfoStreamFromTaskStorage( throw new IAE("Unknown state: [%s]", state); } - final Stream> taskInfoStreamFromTaskStorage = taskStorageQueryAdapter.getTaskInfos( + final Stream taskStatusPlusStream = taskStorageQueryAdapter.getTaskStatusPlusList( taskLookups, dataSource ).stream(); if (type != null) { - return taskInfoStreamFromTaskStorage.filter( - info -> type.equals(info.getTask() == null ? null : info.getTask().getType()) + return taskStatusPlusStream.filter( + statusPlus -> type.equals(statusPlus == null ? null : statusPlus.getType()) ); } else { - return taskInfoStreamFromTaskStorage; + return taskStatusPlusStream; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 15a8619eb28b..f01668fd37a2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -32,10 +32,6 @@ import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.TaskActionClient; -import org.apache.druid.indexing.common.config.TaskConfig; -import org.apache.druid.indexing.common.task.AbstractTask; import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; @@ -231,40 +227,16 @@ public void testSecuredGetWaitingTask() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_3"), - "deny", - getTaskWithIdAndDatasource("id_3", "deny") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_4"), - "deny", - getTaskWithIdAndDatasource("id_4", "deny") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_3", TaskState.RUNNING, "deny"), + createTaskStatusPlus("id_4", TaskState.RUNNING, "deny") ) ); @@ -297,31 +269,13 @@ public void testSecuredGetCompleteTasks() List tasksIds = ImmutableList.of("id_1", "id_2", "id_3"); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "deny", - getTaskWithIdAndDatasource("id_1", "deny") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ) + createTaskStatusPlus("id_1", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); EasyMock.replay( @@ -352,26 +306,14 @@ public void testSecuredGetRunningTasks() ) ); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "deny", - getTaskWithIdAndDatasource("id_1", "deny") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "deny"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow") ) ); EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andStubReturn(RunnerTaskState.RUNNING); @@ -398,7 +340,7 @@ public void testGetTasks() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance(), @@ -409,48 +351,12 @@ public void testGetTasks() ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_5", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_5"), - "deny", - getTaskWithIdAndDatasource("id_5", "deny") - ), - new TaskInfo<>( - "id_6", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_6"), - "allow", - getTaskWithIdAndDatasource("id_6", "allow") - ), - new TaskInfo<>( - "id_7", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_7"), - "allow", - getTaskWithIdAndDatasource("id_7", "allow") - ), - new TaskInfo<>( - "id_5", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_5"), - "deny", - getTaskWithIdAndDatasource("id_5", "deny") - ), - new TaskInfo<>( - "id_6", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_6"), - "allow", - getTaskWithIdAndDatasource("id_6", "allow") - ), - new TaskInfo<>( - "id_7", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_7"), - "allow", - getTaskWithIdAndDatasource("id_7", "allow") - ) + createTaskStatusPlus("id_5", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_6", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_7", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_5", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_6", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_7", TaskState.SUCCESS, "allow") ) ); @@ -481,7 +387,7 @@ public void testGetTasksFilterDataSource() expectAuthorizationTokenCheck(); //completed tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -492,55 +398,13 @@ public void testGetTasksFilterDataSource() ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_5", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_5"), - "allow", - getTaskWithIdAndDatasource("id_5", "allow") - ), - new TaskInfo<>( - "id_6", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_6"), - "allow", - getTaskWithIdAndDatasource("id_6", "allow") - ), - new TaskInfo<>( - "id_7", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_7"), - "allow", - getTaskWithIdAndDatasource("id_7", "allow") - ), - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_4"), - "allow", - getTaskWithIdAndDatasource("id_4", "allow") - ) + createTaskStatusPlus("id_5", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_6", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_7", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_4", TaskState.SUCCESS, "allow") ) ); EasyMock.>expect(taskRunner.getKnownTasks()).andReturn( @@ -572,7 +436,7 @@ public void testGetTasksFilterWaitingState() expectAuthorizationTokenCheck(); //active tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -581,34 +445,10 @@ public void testGetTasksFilterWaitingState() ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_3"), - "deny", - getTaskWithIdAndDatasource("id_3", "deny") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_4"), - "deny", - getTaskWithIdAndDatasource("id_4", "deny") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_3", TaskState.RUNNING, "deny"), + createTaskStatusPlus("id_4", TaskState.RUNNING, "deny") ) ); @@ -645,7 +485,7 @@ public void testGetTasksFilterRunningState() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance() @@ -654,34 +494,10 @@ public void testGetTasksFilterRunningState() ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_4"), - "deny", - getTaskWithIdAndDatasource("id_4", "deny") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_3", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_4", TaskState.RUNNING, "deny") ) ); @@ -726,40 +542,16 @@ public void testGetTasksFilterPendingState() ) ); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()), null ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_1"), - "deny", - getTaskWithIdAndDatasource("id_1", "deny") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ), - new TaskInfo<>( - "id_4", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.running("id_4"), - "deny", - getTaskWithIdAndDatasource("id_4", "deny") - ) + createTaskStatusPlus("id_1", TaskState.RUNNING, "deny"), + createTaskStatusPlus("id_2", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_3", TaskState.RUNNING, "allow"), + createTaskStatusPlus("id_4", TaskState.RUNNING, "deny") ) ); @@ -791,33 +583,15 @@ public void testGetTasksFilterCompleteState() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of(TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, (Duration) null)), null ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "allow", - getTaskWithIdAndDatasource("id_1", "allow") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "deny", - getTaskWithIdAndDatasource("id_2", "deny") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ) + createTaskStatusPlus("id_1", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); EasyMock.replay( @@ -842,33 +616,15 @@ public void testGetTasksFilterCompleteStateWithInterval() expectAuthorizationTokenCheck(); Duration duration = new Period("PT86400S").toStandardDuration(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( EasyMock.anyObject(), EasyMock.anyObject() ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "deny", - getTaskWithIdAndDatasource("id_1", "deny") - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "allow", - getTaskWithIdAndDatasource("id_2", "allow") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ) + createTaskStatusPlus("id_1", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); @@ -886,7 +642,7 @@ public void testGetTasksFilterCompleteStateWithInterval() .getEntity(); Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals("id_2", responseObjects.get(0).getId()); - Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource())); + Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource()); } @Test @@ -898,7 +654,7 @@ public void testGetTasksRequiresDatasourceRead() // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -909,10 +665,10 @@ public void testGetTasksRequiresDatasourceRead() ) ).andStubReturn( ImmutableList.of( - createTaskInfo("id_5", Datasources.WIKIPEDIA), - createTaskInfo("id_6", Datasources.BUZZFEED), - createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "test"), - createTaskInfo("id_4", Datasources.BUZZFEED, TaskState.RUNNING, "test") + createTaskStatusPlus("id_5", TaskState.SUCCESS, Datasources.WIKIPEDIA), + createTaskStatusPlus("id_6", TaskState.SUCCESS, Datasources.BUZZFEED), + createTaskStatusPlus("id_1", TaskState.RUNNING, Datasources.WIKIPEDIA), + createTaskStatusPlus("id_4", TaskState.RUNNING, Datasources.BUZZFEED) ) ); @@ -955,7 +711,7 @@ public void testGetTasksFilterByTaskTypeRequiresDatasourceRead() // Setup mocks to return completed, active, known, pending and running tasks EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null), @@ -966,10 +722,10 @@ public void testGetTasksFilterByTaskTypeRequiresDatasourceRead() ) ).andStubReturn( ImmutableList.of( - createTaskInfo("id_5", Datasources.WIKIPEDIA), - createTaskInfo("id_6", Datasources.BUZZFEED), - createTaskInfo("id_1", Datasources.WIKIPEDIA, TaskState.RUNNING, "to-return"), - createTaskInfo("id_4", Datasources.WIKIPEDIA, TaskState.RUNNING, "test") + createTaskStatusPlus("id_5", TaskState.SUCCESS, Datasources.WIKIPEDIA), + createTaskStatusPlus("id_6", TaskState.SUCCESS, Datasources.BUZZFEED), + createTaskStatusPlus("id_1", TaskState.RUNNING, Datasources.WIKIPEDIA, "to-return"), + createTaskStatusPlus("id_4", TaskState.RUNNING, Datasources.BUZZFEED) ) ); @@ -1025,11 +781,11 @@ public void testGetTasksFilterByDatasourceRequiresReadAccess() } @Test - public void testGetNullCompleteTask() + public void testGetCompleteTasksOfAllDatasources() { expectAuthorizationTokenCheck(); EasyMock.expect( - taskStorageQueryAdapter.getTaskInfos( + taskStorageQueryAdapter.getTaskStatusPlusList( ImmutableMap.of( TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, null) @@ -1038,27 +794,9 @@ public void testGetNullCompleteTask() ) ).andStubReturn( ImmutableList.of( - new TaskInfo<>( - "id_1", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_1"), - "allow", - null - ), - new TaskInfo<>( - "id_2", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_2"), - "deny", - getTaskWithIdAndDatasource("id_2", "deny") - ), - new TaskInfo<>( - "id_3", - DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.success("id_3"), - "allow", - getTaskWithIdAndDatasource("id_3", "allow") - ) + createTaskStatusPlus("id_1", TaskState.SUCCESS, "allow"), + createTaskStatusPlus("id_2", TaskState.SUCCESS, "deny"), + createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); EasyMock.replay( @@ -1074,9 +812,7 @@ public void testGetNullCompleteTask() .getEntity(); Assert.assertEquals(2, responseObjects.size()); Assert.assertEquals("id_1", responseObjects.get(0).getId()); - TaskStatusPlus tsp = responseObjects.get(0); - Assert.assertEquals(null, tsp.getType()); - Assert.assertTrue("DataSource Check", "allow".equals(responseObjects.get(0).getDataSource())); + Assert.assertEquals("DataSource Check", "allow", responseObjects.get(0).getDataSource()); } @Test @@ -1379,19 +1115,19 @@ public void testShutdownAllTasks() Optional.of(mockQueue) ).anyTimes(); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo("datasource")).andStubReturn(ImmutableList.of( - new TaskInfo( + new TaskInfo<>( "id_1", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_1"), "datasource", - getTaskWithIdAndDatasource("id_1", "datasource") + NoopTask.create("id_1", 1) ), - new TaskInfo( + new TaskInfo<>( "id_2", DateTime.now(ISOChronology.getInstanceUTC()), TaskStatus.success("id_2"), "datasource", - getTaskWithIdAndDatasource("id_2", "datasource") + NoopTask.create("id_2", 1) ) )); mockQueue.shutdown("id_1", "Shutdown request from user"); @@ -1679,7 +1415,7 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra Optional.of(workerTaskRunner) ).anyTimes(); EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes(); - AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); + AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers); DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler); @@ -1724,61 +1460,25 @@ private void expectAuthorizationTokenCheck(String username) EasyMock.expectLastCall().anyTimes(); } - private Task getTaskWithIdAndDatasource(String id, String datasource) + private TaskStatusPlus createTaskStatusPlus(String taskId, TaskState taskState, String datasource) { - return getTaskWithIdAndDatasource(id, datasource, "test"); + return createTaskStatusPlus(taskId, taskState, datasource, "test"); } - private Task getTaskWithIdAndDatasource(String id, String datasource, String taskType) + private TaskStatusPlus createTaskStatusPlus(String taskId, TaskState taskState, String datasource, String taskType) { - return new AbstractTask(id, datasource, null) - { - @Override - public String getType() - { - return taskType; - } - - @Override - public boolean isReady(TaskActionClient taskActionClient) - { - return false; - } - - @Override - public void stopGracefully(TaskConfig taskConfig) - { - } - - @Override - public TaskStatus run(TaskToolbox toolbox) - { - return null; - } - }; - } - - private TaskInfo createTaskInfo( - String taskId, - String datasource - ) - { - return createTaskInfo(taskId, datasource, TaskState.SUCCESS, "test"); - } - - private TaskInfo createTaskInfo( - String taskId, - String datasource, - TaskState state, - String taskType - ) - { - return new TaskInfo<>( + return new TaskStatusPlus( taskId, + null, + taskType, DateTime.now(ISOChronology.getInstanceUTC()), - TaskStatus.fromCode(taskId, state), + DateTimes.EPOCH, + taskState, + taskState.isComplete() ? RunnerTaskState.NONE : RunnerTaskState.WAITING, + 100L, + TaskLocation.unknown(), datasource, - getTaskWithIdAndDatasource(taskId, datasource, taskType) + null ); } diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json index 3f0e48fca71e..fe09abad5c8d 100644 --- a/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json +++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_tasks.json @@ -1,8 +1,8 @@ [ { "task_id": "index_auth_test_2030-04-30T01:13:31.893Z", - "type": null, - "group_id": null, + "group_id": "", + "type": "", "datasource": "auth_test", "created_time": "2030-04-30T01:13:31.893Z", "queue_insertion_time": "1970-01-01T00:00:00.000Z", diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 9fe81dcdffbf..781a4b99c8f9 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -44,6 +44,8 @@ import javax.annotation.Nullable; import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.SQLRecoverableException; import java.sql.SQLTransientException; @@ -129,6 +131,8 @@ public String getValidationQuery() public abstract boolean tableExists(Handle handle, String tableName); + public abstract String limitClause(int limit); + public T retryWithHandle( final HandleCallback callback, final Predicate myShouldRetry @@ -327,6 +331,29 @@ tableName, getPayloadType() ) ); } + + public boolean tableContainsColumn(Handle handle, String table, String column) + { + try { + DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData(); + ResultSet columns = databaseMetaData.getColumns( + null, + null, + table, + column + ); + return columns.next(); + } + catch (SQLException e) { + return false; + } + } + + public void prepareTaskEntryTable(final String tableName) + { + createEntryTable(tableName); + alterEntryTable(tableName); + } public void createEntryTable(final String tableName) { @@ -350,6 +377,35 @@ tableName, getPayloadType(), getCollation() ); } + private void alterEntryTable(final String tableName) + { + try { + retryWithHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) + { + final Batch batch = handle.createBatch(); + if (!tableContainsColumn(handle, tableName, "type")) { + log.info("Adding column: type to table[%s]", tableName); + batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName)); + } + if (!tableContainsColumn(handle, tableName, "group_id")) { + log.info("Adding column: group_id to table[%s]", tableName); + batch.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN group_id VARCHAR(255)", tableName)); + } + batch.execute(); + return null; + } + } + ); + } + catch (Exception e) { + log.warn(e, "Exception altering table"); + } + } + public void createLogTable(final String tableName, final String entryTypeName) { createTable( @@ -578,7 +634,7 @@ public void createTaskTables() if (config.get().isCreateTables()) { final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get(); final String entryType = tablesConfig.getTaskEntryType(); - createEntryTable(tablesConfig.getEntryTable(entryType)); + prepareTaskEntryTable(tablesConfig.getEntryTable(entryType)); createLogTable(tablesConfig.getLogTable(entryType), entryType); createLockTable(tablesConfig.getLockTable(entryType), entryType); } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 0b8837c640dc..436af7ad694a 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -21,10 +21,14 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Maps; +import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -34,6 +38,7 @@ import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; import org.joda.time.DateTime; +import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; @@ -53,6 +58,9 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; public abstract class SQLMetadataStorageActionHandler implements MetadataStorageActionHandler @@ -72,6 +80,11 @@ public abstract class SQLMetadataStorageActionHandler taskInfoMapper; + private final TaskStatusMapper taskStatusMapper; + private final TaskStatusMapperFromPayload taskStatusMapperFromPayload; + private final TaskIdentifierMapper taskIdentifierMapper; + + private Future taskMigrationCompleteFuture; @SuppressWarnings("PMD.UnnecessaryFullyQualifiedName") public SQLMetadataStorageActionHandler( @@ -98,6 +111,9 @@ public SQLMetadataStorageActionHandler( this.logTable = logTable; this.lockTable = lockTable; this.taskInfoMapper = new TaskInfoMapper<>(jsonMapper, entryType, statusType); + this.taskStatusMapper = new TaskStatusMapper(jsonMapper); + this.taskStatusMapperFromPayload = new TaskStatusMapperFromPayload(jsonMapper); + this.taskIdentifierMapper = new TaskIdentifierMapper(jsonMapper); } protected SQLMetadataConnector getConnector() @@ -142,15 +158,17 @@ public void insert( final String dataSource, final EntryType entry, final boolean active, - final StatusType status + final StatusType status, + final String type, + final String groupId ) throws EntryExistsException { try { getConnector().retryWithHandle( (HandleCallback) handle -> { final String sql = StringUtils.format( - "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) " - + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", + "INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) " + + "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)", getEntryTable() ); handle.createStatement(sql) @@ -158,6 +176,8 @@ public void insert( .bind("created_date", timestamp.toString()) .bind("datasource", dataSource) .bind("payload", jsonMapper.writeValueAsBytes(entry)) + .bind("type", type) + .bind("group_id", groupId) .bind("active", active) .bind("status_payload", jsonMapper.writeValueAsBytes(status)) .execute(); @@ -284,7 +304,7 @@ public List> getTaskInfos( final Query> query; switch (entry.getKey()) { case ACTIVE: - query = createActiveTaskInfoQuery( + query = createActiveTaskStreamingQuery( handle, dataSource ); @@ -292,7 +312,7 @@ public List> getTaskInfos( break; case COMPLETE: CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue(); - query = createCompletedTaskInfoQuery( + query = createCompletedTaskStreamingQuery( handle, completeTaskLookup.getTasksCreatedPriorTo(), completeTaskLookup.getMaxTaskStatuses(), @@ -311,7 +331,123 @@ public List> getTaskInfos( ); } - protected Query> createCompletedTaskInfoQuery( + @Override + public List> getTaskStatusList( + Map taskLookups, + @Nullable String dataSource + ) + { + boolean fetchPayload = true; + if (taskMigrationCompleteFuture != null && taskMigrationCompleteFuture.isDone()) { + try { + fetchPayload = !taskMigrationCompleteFuture.get(); + } + catch (Exception e) { + log.info(e, "Exception getting task migration future"); + } + } + return getTaskStatusList(taskLookups, dataSource, fetchPayload); + } + + @VisibleForTesting + List> getTaskStatusList( + Map taskLookups, + @Nullable String dataSource, + boolean fetchPayload + ) + { + ResultSetMapper> resultSetMapper = + fetchPayload ? taskStatusMapperFromPayload : taskStatusMapper; + return getConnector().retryTransaction( + (handle, status) -> { + final List> taskMetadataInfos = new ArrayList<>(); + for (Entry entry : taskLookups.entrySet()) { + final Query> query; + switch (entry.getKey()) { + case ACTIVE: + query = fetchPayload + ? createActiveTaskStreamingQuery(handle, dataSource) + : createActiveTaskSummaryStreamingQuery(handle, dataSource); + taskMetadataInfos.addAll(query.map(resultSetMapper).list()); + break; + case COMPLETE: + CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue(); + DateTime priorTo = completeTaskLookup.getTasksCreatedPriorTo(); + Integer limit = completeTaskLookup.getMaxTaskStatuses(); + query = fetchPayload + ? createCompletedTaskStreamingQuery(handle, priorTo, limit, dataSource) + : createCompletedTaskSummaryStreamingQuery(handle, priorTo, limit, dataSource); + taskMetadataInfos.addAll(query.map(resultSetMapper).list()); + break; + default: + throw new IAE("Unknown TaskLookupType: [%s]", entry.getKey()); + } + } + return taskMetadataInfos; + }, + 3, + SQLMetadataConnector.DEFAULT_MAX_TRIES + ); + } + + /** + * Fetches the columns needed to build TaskStatusPlus for completed tasks + * Please note that this requires completion of data migration to avoid empty values for task type and groupId + * Recommended for GET /tasks API + * Uses streaming SQL query to avoid fetching too many rows at once into memory + * @param handle db handle + * @param dataSource datasource to which the tasks belong. null if we don't want to filter + * @return Query object for TaskStatusPlus for completed tasks of interest + */ + private Query> createCompletedTaskSummaryStreamingQuery( + Handle handle, + DateTime timestamp, + @Nullable Integer maxNumStatuses, + @Nullable String dataSource + ) + { + String sql = StringUtils.format( + "SELECT " + + " id, " + + " created_date, " + + " datasource, " + + " group_id, " + + " type, " + + " status_payload " + + "FROM " + + " %s " + + "WHERE " + + getWhereClauseForInactiveStatusesSinceQuery(dataSource) + + "ORDER BY created_date DESC", + getEntryTable() + ); + + if (maxNumStatuses != null) { + sql = decorateSqlWithLimit(sql); + } + Query> query = handle.createQuery(sql) + .bind("start", timestamp.toString()) + .setFetchSize(connector.getStreamingFetchSize()); + + if (maxNumStatuses != null) { + query = query.bind("n", maxNumStatuses); + } + if (dataSource != null) { + query = query.bind("ds", dataSource); + } + return query; + } + + /** + * Fetches the columns needed to build a Task object with payload for completed tasks + * This requires the task payload which can be large. Please use only when necessary. + * For example for ingestion tasks view before migration of the new columns + * Uses streaming SQL query to avoid fetching too many rows at once into memory + * @param handle db handle + * @param dataSource datasource to which the tasks belong. null if we don't want to filter + * @return Query object for completed TaskInfos of interest + */ + private Query> createCompletedTaskStreamingQuery( Handle handle, DateTime timestamp, @Nullable Integer maxNumStatuses, @@ -336,7 +472,9 @@ protected Query> createCompletedTaskInfoQuery( if (maxNumStatuses != null) { sql = decorateSqlWithLimit(sql); } - Query> query = handle.createQuery(sql).bind("start", timestamp.toString()); + Query> query = handle.createQuery(sql) + .bind("start", timestamp.toString()) + .setFetchSize(connector.getStreamingFetchSize()); if (maxNumStatuses != null) { query = query.bind("n", maxNumStatuses); @@ -358,7 +496,51 @@ private String getWhereClauseForInactiveStatusesSinceQuery(@Nullable String data return sql; } - private Query> createActiveTaskInfoQuery(Handle handle, @Nullable String dataSource) + /** + * Fetches the columns needed to build TaskStatusPlus for active tasks + * Please note that this requires completion of data migration to avoid empty values for task type and groupId + * Recommended for GET /tasks API + * Uses streaming SQL query to avoid fetching too many rows at once into memory + * @param handle db handle + * @param dataSource datasource to which the tasks belong. null if we don't want to filter + * @return Query object for TaskStatusPlus for active tasks of interest + */ + private Query> createActiveTaskSummaryStreamingQuery(Handle handle, @Nullable String dataSource) + { + String sql = StringUtils.format( + "SELECT " + + " id, " + + " status_payload, " + + " group_id, " + + " type, " + + " datasource, " + + " created_date " + + "FROM " + + " %s " + + "WHERE " + + getWhereClauseForActiveStatusesQuery(dataSource) + + "ORDER BY created_date", + entryTable + ); + + Query> query = handle.createQuery(sql) + .setFetchSize(connector.getStreamingFetchSize()); + if (dataSource != null) { + query = query.bind("ds", dataSource); + } + return query; + } + + /** + * Fetches the columns needed to build Task objects with payload for active tasks + * This requires the task payload which can be large. Please use only when necessary. + * For example for ingestion tasks view before migration of the new columns + * Uses streaming SQL query to avoid fetching too many rows at once into memory + * @param handle db handle + * @param dataSource datasource to which the tasks belong. null if we don't want to filter + * @return Query object for active TaskInfos of interest + */ + private Query> createActiveTaskStreamingQuery(Handle handle, @Nullable String dataSource) { String sql = StringUtils.format( "SELECT " @@ -375,7 +557,8 @@ private Query> createActiveTaskInfoQuery(Handle handle, @Nul entryTable ); - Query> query = handle.createQuery(sql); + Query> query = handle.createQuery(sql) + .setFetchSize(connector.getStreamingFetchSize()); if (dataSource != null) { query = query.bind("ds", dataSource); } @@ -391,6 +574,109 @@ private String getWhereClauseForActiveStatusesQuery(String dataSource) return sql; } + private class TaskStatusMapperFromPayload implements ResultSetMapper> + { + private final ObjectMapper objectMapper; + + TaskStatusMapperFromPayload(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public TaskInfo map(int index, ResultSet resultSet, StatementContext context) + throws SQLException + { + return toTaskIdentifierInfo(objectMapper, resultSet, true); + } + } + + private class TaskStatusMapper implements ResultSetMapper> + { + private final ObjectMapper objectMapper; + + TaskStatusMapper(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public TaskInfo map(int index, ResultSet resultSet, StatementContext context) + throws SQLException + { + return toTaskIdentifierInfo(objectMapper, resultSet, false); + } + } + + private TaskInfo toTaskIdentifierInfo(ObjectMapper objectMapper, + ResultSet resultSet, + boolean usePayload + ) throws SQLException + { + String type; + String groupId; + if (usePayload) { + try { + ObjectNode payload = objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class); + type = payload.get("type").asText(); + groupId = payload.get("groupId").asText(); + } + catch (IOException e) { + log.error(e, "Encountered exception while deserializing task payload"); + throw new SQLException(e); + } + } else { + type = resultSet.getString("type"); + groupId = resultSet.getString("group_id"); + } + + String id = resultSet.getString("id"); + DateTime createdTime = DateTimes.of(resultSet.getString("created_date")); + StatusType status; + try { + status = objectMapper.readValue(resultSet.getBytes("status_payload"), statusType); + } + catch (IOException e) { + log.error(e, "Encountered exception while deserializing task status_payload"); + throw new SQLException(e); + } + String datasource = resultSet.getString("datasource"); + TaskIdentifier taskIdentifier = new TaskIdentifier(id, groupId, type); + + return new TaskInfo<>(id, createdTime, status, datasource, taskIdentifier); + } + + static class TaskIdentifierMapper implements ResultSetMapper + { + private final ObjectMapper objectMapper; + + TaskIdentifierMapper(ObjectMapper objectMapper) + { + this.objectMapper = objectMapper; + } + + @Override + public TaskIdentifier map(int index, ResultSet resultSet, StatementContext context) + throws SQLException + { + try { + ObjectNode payload = objectMapper.readValue(resultSet.getBytes("payload"), ObjectNode.class); + // If field is absent (older task version), use blank string to avoid a loop of migration of such tasks. + JsonNode type = payload.get("type"); + JsonNode groupId = payload.get("groupId"); + return new TaskIdentifier( + resultSet.getString("id"), + groupId == null ? "" : groupId.asText(), + type == null ? "" : type.asText() + ); + } + catch (IOException e) { + log.error(e, "Encountered exception while deserializing task payload"); + throw new SQLException(e); + } + } + } + static class TaskInfoMapper implements ResultSetMapper> { private final ObjectMapper objectMapper; @@ -679,4 +965,105 @@ public Long getLockId(String entryId, LockType lock) .findAny() .orElse(null); } + + private List fetchTaskMetadatas(String tableName, String id, int limit) + { + List taskIdentifiers = new ArrayList<>(); + connector.retryWithHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) + { + String sql = StringUtils.format( + "SELECT * FROM %1$s WHERE id > '%2$s' AND type IS null ORDER BY id %3$s", + tableName, + id, + connector.limitClause(limit) + ); + Query> query = handle.createQuery(sql); + taskIdentifiers.addAll(query.map(taskIdentifierMapper).list()); + return null; + } + } + ); + return taskIdentifiers; + } + + private void updateTaskMetadatas(String tasksTable, List taskIdentifiers) + { + connector.retryWithHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) + { + Batch batch = handle.createBatch(); + String sql = "UPDATE %1$s SET type = '%2$s', group_id = '%3$s' WHERE id = '%4$s'"; + for (TaskIdentifier metadata : taskIdentifiers) { + batch.add(StringUtils.format(sql, tasksTable, metadata.getType(), metadata.getGroupId(), metadata.getId()) + ); + } + batch.execute(); + return null; + } + } + ); + } + + @Override + public void populateTaskTypeAndGroupIdAsync() + { + ExecutorService executorService = Executors.newSingleThreadExecutor(); + taskMigrationCompleteFuture = executorService.submit( + () -> populateTaskTypeAndGroupId() + ); + } + + /** + * Utility to migrate existing tasks to the new schema by populating type and groupId synchronously + * + * @return true if successful + */ + @VisibleForTesting + boolean populateTaskTypeAndGroupId() + { + log.info("Populate fields task and group_id of task entry table [%s] from payload", entryTable); + String id = ""; + int limit = 100; + int count = 0; + while (true) { + List taskIdentifiers; + try { + taskIdentifiers = fetchTaskMetadatas(entryTable, id, limit); + } + catch (Exception e) { + log.warn(e, "Task migration failed while reading entries from task table"); + return false; + } + if (taskIdentifiers.isEmpty()) { + break; + } + try { + updateTaskMetadatas(entryTable, taskIdentifiers); + count += taskIdentifiers.size(); + log.info("Successfully updated type and groupId for [%d] tasks", count); + } + catch (Exception e) { + log.warn(e, "Task migration failed while updating entries in task table"); + return false; + } + id = taskIdentifiers.get(taskIdentifiers.size() - 1).getId(); + + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + log.info("Interrupted, exiting!"); + Thread.currentThread().interrupt(); + } + } + log.info("Task migration for table [%s] successful", entryTable); + return true; + } } diff --git a/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java b/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java index 989d0decf3db..19d0c6b04f16 100644 --- a/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/storage/derby/DerbyConnector.java @@ -35,6 +35,11 @@ import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Locale; + @ManageLifecycle public class DerbyConnector extends SQLMetadataConnector { @@ -83,6 +88,24 @@ public boolean tableExists(Handle handle, String tableName) .isEmpty(); } + @Override + public boolean tableContainsColumn(Handle handle, String table, String column) + { + try { + DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData(); + ResultSet columns = databaseMetaData.getColumns( + null, + null, + table.toUpperCase(Locale.ENGLISH), + column.toUpperCase(Locale.ENGLISH) + ); + return columns.next(); + } + catch (SQLException e) { + return false; + } + } + @Override public String getSerialType() { @@ -114,6 +137,12 @@ public String getValidationQuery() return "VALUES 1"; } + @Override + public String limitClause(int limit) + { + return String.format(Locale.ENGLISH, "FETCH NEXT %d ROWS ONLY", limit); + } + @Override public void exportTable( String tableName, diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java index 9c55a252b92d..1c192da475d5 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java @@ -32,7 +32,11 @@ import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; @@ -86,6 +90,14 @@ public Void withHandle(Handle handle) ); } + String taskTable = tablesConfig.getTasksTable(); + for (String column : Arrays.asList("type", "group_id")) { + Assert.assertTrue( + StringUtils.format("Tasks table column %s was not created!", column), + connector.tableContainsColumn(handle, taskTable, column) + ); + } + return null; } } @@ -170,6 +182,12 @@ public int getStreamingFetchSize() return 0; } + @Override + public String limitClause(int limit) + { + return ""; + } + @Override public String getQuoteString() { @@ -242,4 +260,32 @@ public void testBasicDataSourceCreation() throws Exception Assert.assertEquals(dataSource.getMaxConnLifetimeMillis(), 1200000); Assert.assertEquals((long) dataSource.getDefaultQueryTimeout(), 30000); } + + private boolean verifyTaskTypeAndGroupId(String table, String id, String type, String groupId) + { + try { + return connector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws SQLException + { + Statement statement = handle.getConnection().createStatement(); + ResultSet resultSet = statement.executeQuery( + StringUtils.format("SELECT * FROM %1$s WHERE id = '%2$s'", table, id) + ); + resultSet.next(); + boolean flag = type.equals(resultSet.getString("type")) + && groupId.equals(resultSet.getString("group_id")); + statement.close(); + return flag; + } + } + ); + } + catch (Exception e) { + e.printStackTrace(); + return false; + } + } } diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index 2e85e5a5d49e..f89ae9b0ebda 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -25,7 +25,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskState; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; @@ -33,15 +36,26 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.tweak.HandleCallback; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Random; +import java.util.UUID; import java.util.stream.Collectors; public class SQLMetadataStorageActionHandlerTest @@ -53,7 +67,13 @@ public class SQLMetadataStorageActionHandlerTest public final ExpectedException thrown = ExpectedException.none(); private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper(); - private SQLMetadataStorageActionHandler, Map, Map, Map> handler; + + + private static final Random RANDOM = new Random(1); + + private SQLMetadataStorageActionHandler, Map, Map, Map> handler; + + private final String entryTable = "entries"; @Before public void setUp() @@ -61,31 +81,30 @@ public void setUp() TestDerbyConnector connector = derbyConnectorRule.getConnector(); final String entryType = "entry"; - final String entryTable = "entries"; final String logTable = "logs"; final String lockTable = "locks"; - connector.createEntryTable(entryTable); + connector.prepareTaskEntryTable(entryTable); connector.createLockTable(lockTable, entryType); connector.createLogTable(logTable, entryType); handler = new DerbyMetadataStorageActionHandler<>( connector, JSON_MAPPER, - new MetadataStorageActionHandlerTypes, Map, Map, Map>() + new MetadataStorageActionHandlerTypes, Map, Map, Map>() { @Override - public TypeReference> getEntryType() + public TypeReference> getEntryType() { - return new TypeReference>() + return new TypeReference>() { }; } @Override - public TypeReference> getStatusType() + public TypeReference> getStatusType() { - return new TypeReference>() + return new TypeReference>() { }; } @@ -97,9 +116,9 @@ public TypeReference> getLogType() } @Override - public TypeReference> getLockType() + public TypeReference> getLockType() { - return new TypeReference>() + return new TypeReference>() { }; } @@ -114,13 +133,13 @@ public TypeReference> getLockType() @Test public void testEntryAndStatus() throws Exception { - Map entry = ImmutableMap.of("numericId", 1234); - Map status1 = ImmutableMap.of("count", 42); - Map status2 = ImmutableMap.of("count", 42, "temp", 1); + Map entry = ImmutableMap.of("numericId", 1234); + Map status1 = ImmutableMap.of("count", 42); + Map status2 = ImmutableMap.of("count", 42, "temp", 1); final String entryId = "1234"; - handler.insert(entryId, DateTimes.of("2014-01-02T00:00:00.123"), "testDataSource", entry, true, null); + handler.insert(entryId, DateTimes.of("2014-01-02T00:00:00.123"), "testDataSource", entry, true, null, "type", "group"); Assert.assertEquals( Optional.of(entry), @@ -195,13 +214,13 @@ public void testGetRecentStatuses() throws EntryExistsException { for (int i = 1; i < 11; i++) { final String entryId = "abcd_" + i; - final Map entry = ImmutableMap.of("a", i); - final Map status = ImmutableMap.of("count", i * 10); + final Map entry = ImmutableMap.of("a", i); + final Map status = ImmutableMap.of("count", i * 10); - handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status); + handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status, "type", "group"); } - final List, Map>> statuses = handler.getTaskInfos( + final List, Map>> statuses = handler.getTaskInfos( CompleteTaskLookup.withTasksCreatedPriorTo( 7, DateTimes.of("2014-01-01") @@ -210,7 +229,7 @@ public void testGetRecentStatuses() throws EntryExistsException ); Assert.assertEquals(7, statuses.size()); int i = 10; - for (TaskInfo, Map> status : statuses) { + for (TaskInfo, Map> status : statuses) { Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus()); } } @@ -220,13 +239,13 @@ public void testGetRecentStatuses2() throws EntryExistsException { for (int i = 1; i < 6; i++) { final String entryId = "abcd_" + i; - final Map entry = ImmutableMap.of("a", i); - final Map status = ImmutableMap.of("count", i * 10); + final Map entry = ImmutableMap.of("a", i); + final Map status = ImmutableMap.of("count", i * 10); - handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status); + handler.insert(entryId, DateTimes.of(StringUtils.format("2014-01-%02d", i)), "test", entry, false, status, "type", "group"); } - final List, Map>> statuses = handler.getTaskInfos( + final List, Map>> statuses = handler.getTaskInfos( CompleteTaskLookup.withTasksCreatedPriorTo( 10, DateTimes.of("2014-01-01") @@ -235,7 +254,7 @@ public void testGetRecentStatuses2() throws EntryExistsException ); Assert.assertEquals(5, statuses.size()); int i = 5; - for (TaskInfo, Map> status : statuses) { + for (TaskInfo, Map> status : statuses) { Assert.assertEquals(ImmutableMap.of("count", i-- * 10), status.getStatus()); } } @@ -244,23 +263,23 @@ public void testGetRecentStatuses2() throws EntryExistsException public void testRepeatInsert() throws Exception { final String entryId = "abcd"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); thrown.expect(EntryExistsException.class); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); } @Test public void testLogs() throws Exception { final String entryId = "abcd"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); Assert.assertEquals( ImmutableList.of(), @@ -289,32 +308,32 @@ public void testLogs() throws Exception public void testLocks() throws Exception { final String entryId = "ABC123"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks("non_exist_entry") ); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks(entryId) ); - final ImmutableMap lock1 = ImmutableMap.of("lock", 1); - final ImmutableMap lock2 = ImmutableMap.of("lock", 2); + final ImmutableMap lock1 = ImmutableMap.of("lock", 1); + final ImmutableMap lock2 = ImmutableMap.of("lock", 2); Assert.assertTrue(handler.addLock(entryId, lock1)); Assert.assertTrue(handler.addLock(entryId, lock2)); - final Map> locks = handler.getLocks(entryId); + final Map> locks = handler.getLocks(entryId); Assert.assertEquals(2, locks.size()); Assert.assertEquals( - ImmutableSet.>of(lock1, lock2), + ImmutableSet.>of(lock1, lock2), new HashSet<>(locks.values()) ); @@ -322,7 +341,7 @@ public void testLocks() throws Exception handler.removeLock(lockId); locks.remove(lockId); - final Map> updated = handler.getLocks(entryId); + final Map> updated = handler.getLocks(entryId); Assert.assertEquals( new HashSet<>(locks.values()), new HashSet<>(updated.values()) @@ -334,23 +353,23 @@ public void testLocks() throws Exception public void testReplaceLock() throws EntryExistsException { final String entryId = "ABC123"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks("non_exist_entry") ); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks(entryId) ); - final ImmutableMap lock1 = ImmutableMap.of("lock", 1); - final ImmutableMap lock2 = ImmutableMap.of("lock", 2); + final ImmutableMap lock1 = ImmutableMap.of("lock", 1); + final ImmutableMap lock2 = ImmutableMap.of("lock", 2); Assert.assertTrue(handler.addLock(entryId, lock1)); @@ -364,23 +383,23 @@ public void testReplaceLock() throws EntryExistsException public void testGetLockId() throws EntryExistsException { final String entryId = "ABC123"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); + Map entry = ImmutableMap.of("a", 1); + Map status = ImmutableMap.of("count", 42); - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status); + handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks("non_exist_entry") ); Assert.assertEquals( - ImmutableMap.>of(), + ImmutableMap.>of(), handler.getLocks(entryId) ); - final ImmutableMap lock1 = ImmutableMap.of("lock", 1); - final ImmutableMap lock2 = ImmutableMap.of("lock", 2); + final ImmutableMap lock1 = ImmutableMap.of("lock", 1); + final ImmutableMap lock2 = ImmutableMap.of("lock", 2); Assert.assertTrue(handler.addLock(entryId, lock1)); @@ -392,21 +411,21 @@ public void testGetLockId() throws EntryExistsException public void testRemoveTasksOlderThan() throws Exception { final String entryId1 = "1234"; - Map entry1 = ImmutableMap.of("numericId", 1234); - Map status1 = ImmutableMap.of("count", 42, "temp", 1); - handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1); + Map entry1 = ImmutableMap.of("numericId", 1234); + Map status1 = ImmutableMap.of("count", 42, "temp", 1); + handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1, "type", "group"); Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created"))); final String entryId2 = "ABC123"; - Map entry2 = ImmutableMap.of("a", 1); - Map status2 = ImmutableMap.of("count", 42); - handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2); + Map entry2 = ImmutableMap.of("a", 1); + Map status2 = ImmutableMap.of("count", 42); + handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2, "type", "group"); Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created"))); final String entryId3 = "DEF5678"; - Map entry3 = ImmutableMap.of("numericId", 5678); - Map status3 = ImmutableMap.of("count", 21, "temp", 2); - handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3); + Map entry3 = ImmutableMap.of("numericId", 5678); + Map status3 = ImmutableMap.of("count", 21, "temp", 2); + handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3, "type", "group"); Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created"))); Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1)); @@ -450,4 +469,196 @@ public void testRemoveTasksOlderThan() throws Exception Assert.assertEquals(1, handler.getLogs(entryId2).size()); Assert.assertEquals(1, handler.getLogs(entryId3).size()); } + + @Test + public void testMigration() + { + int active = 1234; + for (int i = 0; i < active; i++) { + insertTaskInfo(createRandomTaskInfo(true), false); + } + + int completed = 2345; + for (int i = 0; i < completed; i++) { + insertTaskInfo(createRandomTaskInfo(false), false); + } + + Assert.assertEquals(active + completed, getUnmigratedTaskCount().intValue()); + + handler.populateTaskTypeAndGroupId(); + + Assert.assertEquals(0, getUnmigratedTaskCount().intValue()); + } + + @Test + public void testGetTaskStatusPlusListInternal() + { + // SETUP + TaskInfo, Map> activeUnaltered = createRandomTaskInfo(true); + insertTaskInfo(activeUnaltered, false); + + TaskInfo, Map> completedUnaltered = createRandomTaskInfo(false); + insertTaskInfo(completedUnaltered, false); + + TaskInfo, Map> activeAltered = createRandomTaskInfo(true); + insertTaskInfo(activeAltered, true); + + TaskInfo, Map> completedAltered = createRandomTaskInfo(false); + insertTaskInfo(completedAltered, true); + + Map taskLookups = new HashMap<>(); + taskLookups.put(TaskLookup.TaskLookupType.ACTIVE, ActiveTaskLookup.getInstance()); + taskLookups.put(TaskLookup.TaskLookupType.COMPLETE, CompleteTaskLookup.of(null, Duration.millis(86400000))); + + List>> taskMetadataInfos; + + // BEFORE MIGRATION + + // Payload based fetch. task type and groupid will be populated + taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, true); + Assert.assertEquals(4, taskMetadataInfos.size()); + verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false); + + // New columns based fetch before migration is complete. type and payload are null when altered = false + taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, false); + Assert.assertEquals(4, taskMetadataInfos.size()); + verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, true); + verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, true); + verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false); + + // MIGRATION + handler.populateTaskTypeAndGroupId(); + + // Payload based fetch. task type and groupid will still be populated in tasks tab + taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, true); + Assert.assertEquals(4, taskMetadataInfos.size()); + verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false); + + // New columns based fetch after migration is complete. All data must be populated in the tasks table + taskMetadataInfos = handler.getTaskStatusList(taskLookups, null, false); + Assert.assertEquals(4, taskMetadataInfos.size()); + verifyTaskInfoToMetadataInfo(completedUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(completedAltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeUnaltered, taskMetadataInfos, false); + verifyTaskInfoToMetadataInfo(activeAltered, taskMetadataInfos, false); + } + + private Integer getUnmigratedTaskCount() + { + return handler.getConnector().retryWithHandle( + new HandleCallback() + { + @Override + public Integer withHandle(Handle handle) throws SQLException + { + String sql = String.format(Locale.ENGLISH, + "SELECT COUNT(*) FROM %s WHERE type is NULL or group_id is NULL", + entryTable); + ResultSet resultSet = handle.getConnection().createStatement().executeQuery(sql); + resultSet.next(); + return resultSet.getInt(1); + } + } + ); + } + + private TaskInfo, Map> createRandomTaskInfo(boolean active) + { + String id = UUID.randomUUID().toString(); + DateTime createdTime = DateTime.now(DateTimeZone.UTC); + String datasource = UUID.randomUUID().toString(); + String type = UUID.randomUUID().toString(); + String groupId = UUID.randomUUID().toString(); + + Map payload = new HashMap<>(); + payload.put("id", id); + payload.put("type", type); + payload.put("groupId", groupId); + + Map status = new HashMap<>(); + status.put("id", id); + status.put("status", active ? TaskState.RUNNING : TaskState.SUCCESS); + status.put("duration", RANDOM.nextLong()); + status.put("location", TaskLocation.create(UUID.randomUUID().toString(), 8080, 995)); + status.put("errorMsg", UUID.randomUUID().toString()); + + return new TaskInfo<>( + id, + createdTime, + status, + datasource, + payload + ); + } + + private void insertTaskInfo(TaskInfo, Map> taskInfo, + boolean altered) + { + try { + handler.insert( + taskInfo.getId(), + taskInfo.getCreatedTime(), + taskInfo.getDataSource(), + taskInfo.getTask(), + TaskState.RUNNING.equals(taskInfo.getStatus().get("status")), + taskInfo.getStatus(), + altered ? taskInfo.getTask().get("type").toString() : null, + altered ? taskInfo.getTask().get("groupId").toString() : null + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void verifyTaskInfoToMetadataInfo(TaskInfo, Map> taskInfo, + List>> taskMetadataInfos, + boolean nullNewColumns) + { + for (TaskInfo> taskMetadataInfo : taskMetadataInfos) { + if (taskMetadataInfo.getId().equals(taskInfo.getId())) { + verifyTaskInfoToMetadataInfo(taskInfo, taskMetadataInfo, nullNewColumns); + } + return; + } + Assert.fail(); + } + + private void verifyTaskInfoToMetadataInfo(TaskInfo, Map> taskInfo, + TaskInfo> taskMetadataInfo, + boolean nullNewColumns) + { + Assert.assertEquals(taskInfo.getId(), taskMetadataInfo.getId()); + Assert.assertEquals(taskInfo.getCreatedTime(), taskMetadataInfo.getCreatedTime()); + Assert.assertEquals(taskInfo.getDataSource(), taskMetadataInfo.getDataSource()); + + verifyTaskStatus(taskInfo.getStatus(), taskMetadataInfo.getStatus()); + + Map task = taskInfo.getTask(); + TaskIdentifier taskIdentifier = taskMetadataInfo.getTask(); + Assert.assertEquals(task.get("id"), taskIdentifier.getId()); + if (nullNewColumns) { + Assert.assertNull(taskIdentifier.getGroupId()); + Assert.assertNull(taskIdentifier.getType()); + } else { + Assert.assertEquals(task.get("groupId"), taskIdentifier.getGroupId()); + Assert.assertEquals(task.get("type"), taskIdentifier.getType()); + } + } + + private void verifyTaskStatus(Map expected, Map actual) + { + Assert.assertEquals(expected.get("id"), actual.get("id")); + Assert.assertEquals(expected.get("duration"), actual.get("duration")); + Assert.assertEquals(expected.get("errorMsg"), actual.get("errorMsg")); + Assert.assertEquals(expected.get("status").toString(), actual.get("status")); + Assert.assertEquals(expected.get("location"), JSON_MAPPER.convertValue(actual.get("location"), TaskLocation.class)); + } }