getTaskInfo(String entryId);
+ TaskInfo getTaskInfo(String entryId);
/**
* Returns a list of {@link TaskInfo} from metadata store that matches to the given filters.
- *
+ *
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
@@ -100,14 +105,14 @@ void insert(
* @param taskLookups task lookup type and filters.
* @param datasource datasource filter
*/
- List> getTaskInfos(
+ List getTaskInfos(
Map taskLookups,
@Nullable String datasource
);
/**
* Returns the statuses of the specified tasks.
- *
+ *
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
@@ -116,12 +121,12 @@ List> getTaskInfos(
* @param taskLookups task lookup type and filters.
* @param datasource datasource filter
*/
- List> getTaskStatusList(
+ List getTaskStatusList(
Map taskLookups,
@Nullable String datasource
);
- default List> getTaskInfos(
+ default List getTaskInfos(
TaskLookup taskLookup,
@Nullable String datasource
)
@@ -136,7 +141,7 @@ default List> getTaskInfos(
* @param lock lock to add
* @return true if the lock was added
*/
- boolean addLock(String entryId, LockType lock);
+ boolean addLock(String entryId, TaskLock lock);
/**
* Replace an existing lock with a new lock.
@@ -147,7 +152,7 @@ default List> getTaskInfos(
*
* @return true if the lock is replaced
*/
- boolean replaceLock(String entryId, long oldLockId, LockType newLock);
+ boolean replaceLock(String entryId, long oldLockId, TaskLock newLock);
/**
* Remove the lock with the given lock id.
@@ -163,43 +168,13 @@ default List> getTaskInfos(
*/
void removeTasksOlderThan(long timestamp);
- /**
- * Task logs are not used anymore and this method is never called by Druid code.
- * It has been retained only for backwards compatibility with older extensions.
- * New extensions must not implement this method.
- *
- * @throws DruidException of category UNSUPPORTED whenever called.
- */
- @Deprecated
- default boolean addLog(String entryId, LogType log)
- {
- throw DruidException.defensive()
- .ofCategory(DruidException.Category.UNSUPPORTED)
- .build("Task actions are not logged anymore.");
- }
-
- /**
- * Task logs are not used anymore and this method is never called by Druid code.
- * It has been retained only for backwards compatibility with older extensions.
- * New extensions must not implement this method.
- *
- * @throws DruidException of category UNSUPPORTED whenever called.
- */
- @Deprecated
- default List getLogs(String entryId)
- {
- throw DruidException.defensive()
- .ofCategory(DruidException.Category.UNSUPPORTED)
- .build("Task actions are not logged anymore.");
- }
-
/**
* Returns the locks for the given entry
*
* @param entryId entry id
* @return map of lockId to lock
*/
- Map getLocks(String entryId);
+ Map getLocks(String entryId);
/**
* Returns the lock id for the given entry and the lock.
@@ -207,7 +182,7 @@ default List getLogs(String entryId)
* @return lock id if found, otherwise null.
*/
@Nullable
- Long getLockId(String entryId, LockType lock);
+ Long getLockId(String entryId, TaskLock lock);
/**
* Utility to migrate existing tasks to the new schema by populating type and groupId asynchronously
diff --git a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerFactory.java b/indexing-service/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerFactory.java
similarity index 78%
rename from processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerFactory.java
rename to indexing-service/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerFactory.java
index a81ecfdbf1c4..2a4ea305696b 100644
--- a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerFactory.java
+++ b/indexing-service/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerFactory.java
@@ -21,8 +21,5 @@
public interface MetadataStorageActionHandlerFactory
{
- MetadataStorageActionHandler create(
- String entryType,
- MetadataStorageActionHandlerTypes payloadTypes
- );
+ MetadataStorageActionHandler create();
}
diff --git a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerTypes.java b/indexing-service/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerTypes.java
similarity index 100%
rename from processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerTypes.java
rename to indexing-service/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerTypes.java
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/indexing-service/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
similarity index 81%
rename from server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
rename to indexing-service/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
index f8c506d71da9..b7190edf6518 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
+++ b/indexing-service/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java
@@ -20,7 +20,6 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -30,8 +29,12 @@
import com.google.common.collect.Maps;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
+import org.apache.druid.indexer.TaskIdStatus;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.TaskLock;
+import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
@@ -41,18 +44,16 @@
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Batch;
-import org.skife.jdbi.v2.FoldController;
-import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
-import org.skife.jdbi.v2.exceptions.StatementException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
@@ -60,73 +61,45 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
-public abstract class SQLMetadataStorageActionHandler
- implements MetadataStorageActionHandler
+public abstract class SQLMetadataStorageActionHandler
+ implements MetadataStorageActionHandler
{
private static final EmittingLogger log = new EmittingLogger(SQLMetadataStorageActionHandler.class);
private static final String CONTEXT_KEY_IS_TRANSIENT = "isTransient";
private final SQLMetadataConnector connector;
private final ObjectMapper jsonMapper;
- private final TypeReference entryType;
- private final TypeReference statusType;
- private final TypeReference lockType;
- private final String entryTypeName;
private final String entryTable;
private final String lockTable;
- private final TaskInfoMapper taskInfoMapper;
+ private final TaskInfoMapper taskInfoMapper;
private final TaskStatusMapper taskStatusMapper;
private final TaskStatusMapperFromPayload taskStatusMapperFromPayload;
private final TaskIdentifierMapper taskIdentifierMapper;
private Future taskMigrationCompleteFuture;
- /**
- * @deprecated Use the other constructor without {@code logTable} argument
- * since this argument is now unused.
- */
- @Deprecated
- public SQLMetadataStorageActionHandler(
- final SQLMetadataConnector connector,
- final ObjectMapper jsonMapper,
- final MetadataStorageActionHandlerTypes types,
- final String entryTypeName,
- final String entryTable,
- final String logTable,
- final String lockTable
- )
- {
- this(connector, jsonMapper, types, entryTypeName, entryTable, lockTable);
- }
-
@SuppressWarnings("PMD.UnnecessaryFullyQualifiedName")
public SQLMetadataStorageActionHandler(
final SQLMetadataConnector connector,
final ObjectMapper jsonMapper,
- final MetadataStorageActionHandlerTypes types,
- final String entryTypeName,
final String entryTable,
final String lockTable
)
{
this.connector = connector;
- //fully qualified references required below due to identical package names across project modules.
- //noinspection UnnecessaryFullyQualifiedName
this.jsonMapper = jsonMapper.copy().addMixIn(PasswordProvider.class, PasswordProviderRedactionMixIn.class);
- this.entryType = types.getEntryType();
- this.statusType = types.getStatusType();
- this.lockType = types.getLockType();
- this.entryTypeName = entryTypeName;
this.entryTable = entryTable;
this.lockTable = lockTable;
- this.taskInfoMapper = new TaskInfoMapper<>(jsonMapper, entryType, statusType);
+ this.taskInfoMapper = new TaskInfoMapper(jsonMapper);
this.taskStatusMapper = new TaskStatusMapper(jsonMapper);
this.taskStatusMapperFromPayload = new TaskStatusMapperFromPayload(jsonMapper);
this.taskIdentifierMapper = new TaskIdentifierMapper(jsonMapper);
@@ -142,39 +115,19 @@ protected ObjectMapper getJsonMapper()
return jsonMapper;
}
- protected TypeReference getStatusType()
- {
- return statusType;
- }
-
protected String getEntryTable()
{
return entryTable;
}
- protected String getLogTable()
- {
- throw new UnsupportedOperationException("'tasklogs' table is not used anymore");
- }
-
- protected String getEntryTypeName()
- {
- return entryTypeName;
- }
-
- public TypeReference getEntryType()
- {
- return entryType;
- }
-
@Override
public void insert(
final String id,
final DateTime timestamp,
final String dataSource,
- final EntryType entry,
+ final @NotNull Task entry,
final boolean active,
- final StatusType status,
+ final TaskStatus status,
final String type,
final String groupId
)
@@ -209,9 +162,9 @@ private Void insertEntryWithHandle(
String entryId,
DateTime timestamp,
String dataSource,
- EntryType entry,
+ Object entry,
boolean active,
- StatusType status,
+ Object status,
String type,
String groupId
)
@@ -239,12 +192,6 @@ private Void insertEntryWithHandle(
}
}
- public static boolean isStatementException(Throwable e)
- {
- return e instanceof StatementException ||
- (e instanceof CallbackFailedException && e.getCause() instanceof StatementException);
- }
-
private boolean isTransientDruidException(Throwable t)
{
if (t instanceof CallbackFailedException) {
@@ -257,7 +204,7 @@ private boolean isTransientDruidException(Throwable t)
}
@Override
- public boolean setStatus(final String entryId, final boolean active, final StatusType status)
+ public boolean setStatus(final String entryId, final boolean active, final TaskStatus status)
{
return connector.retryWithHandle(
new HandleCallback<>()
@@ -281,13 +228,13 @@ public Boolean withHandle(Handle handle) throws Exception
}
@Override
- public Optional getEntry(final String entryId)
+ public Optional getEntry(final String entryId)
{
return connector.retryWithHandle(
new HandleCallback<>()
{
@Override
- public Optional withHandle(Handle handle) throws Exception
+ public Optional withHandle(Handle handle) throws Exception
{
byte[] res = handle.createQuery(
StringUtils.format("SELECT payload FROM %s WHERE id = :id", entryTable)
@@ -297,7 +244,7 @@ public Optional withHandle(Handle handle) throws Exception
.first();
return Optional.fromNullable(
- res == null ? null : jsonMapper.readValue(res, entryType)
+ res == null ? null : jsonMapper.readValue(res, Task.class)
);
}
}
@@ -306,13 +253,13 @@ public Optional withHandle(Handle handle) throws Exception
}
@Override
- public Optional getStatus(final String entryId)
+ public Optional getStatus(final String entryId)
{
return connector.retryWithHandle(
new HandleCallback<>()
{
@Override
- public Optional withHandle(Handle handle) throws Exception
+ public Optional withHandle(Handle handle) throws Exception
{
byte[] res = handle.createQuery(
StringUtils.format("SELECT status_payload FROM %s WHERE id = :id", entryTable)
@@ -322,7 +269,7 @@ public Optional withHandle(Handle handle) throws Exception
.first();
return Optional.fromNullable(
- res == null ? null : jsonMapper.readValue(res, statusType)
+ res == null ? null : jsonMapper.readValue(res, TaskStatus.class)
);
}
}
@@ -331,7 +278,7 @@ public Optional withHandle(Handle handle) throws Exception
@Override
@Nullable
- public TaskInfo getTaskInfo(String entryId)
+ public TaskInfo getTaskInfo(String entryId)
{
return connector.retryWithHandle(handle -> {
final String query = StringUtils.format(
@@ -346,14 +293,14 @@ public TaskInfo getTaskInfo(String entryId)
}
@Override
- public List> getTaskInfos(
+ public List getTaskInfos(
Map taskLookups,
@Nullable String dataSource
)
{
return getConnector().retryTransaction(
(handle, status) -> {
- final List> tasks = new ArrayList<>();
+ final List tasks = new ArrayList<>();
for (Entry entry : taskLookups.entrySet()) {
final Query