Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions core/src/main/java/org/apache/druid/indexer/TaskInfoLite.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexer;

import com.google.common.base.Preconditions;
import org.joda.time.DateTime;

import javax.annotation.Nullable;

/**
* This class is used to store task info that is only necessary to some status queries in the OverlordResource
*/
public class TaskInfoLite
{
private final String id;
private final String groupId;
private final String type;
private final String dataSource;
private final TaskLocation location;
private final DateTime createdTime;
private final String status;
private final Long duration;
private final @Nullable String errorMsg;

public TaskInfoLite(
String id,
String groupId,
String type,
String dataSource,
TaskLocation location,
DateTime createdTime,
String status,
Long duration,
@Nullable String errorMsg
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.type = Preconditions.checkNotNull(type, "type");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.location = Preconditions.checkNotNull(location, "location");
this.createdTime = Preconditions.checkNotNull(createdTime, "createdTime");
this.status = Preconditions.checkNotNull(status, "status");
this.duration = Preconditions.checkNotNull(duration, "duration");
this.errorMsg = errorMsg;
}

public String getId()
{
return id;
}

public String getGroupId()
{
return groupId;
}

public String getType()
{
return type;
}

public String getDataSource()
{
return dataSource;
}

public TaskLocation getLocation()
{
return location;
}

public DateTime getCreatedTime()
{
return createdTime;
}

public TaskState getStatus()
{
switch(status) {
case "SUCCESS":
return TaskState.SUCCESS;
case "FAILED":
return TaskState.FAILED;
case "RUNNING":
default:
return TaskState.RUNNING;
}
}

public Long getDuration()
{
return duration;
}

public String getErrorMsg() {
return errorMsg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import com.google.common.base.Optional;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskInfoLite;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -81,6 +83,11 @@ void insert(
@Nullable
TaskInfo<EntryType, StatusType> getTaskInfo(String entryId);

@Nullable
default TaskInfoLite getTaskInfoLite(String entryId) {
return null;
}

/**
* Return up to {@code maxNumStatuses} {@link TaskInfo} objects for all inactive entries
* created on or later than the given timestamp
Expand All @@ -96,13 +103,25 @@ List<TaskInfo<EntryType, StatusType>> getCompletedTaskInfo(
@Nullable String datasource
);

default List<TaskInfoLite> getCompletedTaskInfoLite(
DateTime timestamp,
@Nullable Integer maxNumStatuses,
@Nullable String datasource
) {
return Collections.emptyList();
}

/**
* Return {@link TaskInfo} objects for all active entries
*
* @return list of {@link TaskInfo}
*/
List<TaskInfo<EntryType, StatusType>> getActiveTaskInfo(@Nullable String dataSource);

default List<TaskInfoLite> getActiveTaskInfoLite(@Nullable String dataSource) {
return Collections.emptyList();
}

/**
* Add a lock to the given entry
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.eclipse.jetty.util.ajax.JSON;
import org.postgresql.PGProperty;
import org.postgresql.util.PSQLException;
import org.skife.jdbi.v2.DBI;
Expand All @@ -45,6 +46,7 @@ public class PostgreSQLConnector extends SQLMetadataConnector
{
private static final Logger log = new Logger(PostgreSQLConnector.class);
private static final String PAYLOAD_TYPE = "BYTEA";
private static final String JSON_PAYLOAD_TYPE = "JSONB";
private static final String SERIAL_TYPE = "BIGSERIAL";
private static final String QUOTE_STRING = "\\\"";
private static final String PSQL_SERIALIZATION_FAILURE_MSG =
Expand Down Expand Up @@ -126,6 +128,12 @@ public String getPayloadType()
return PAYLOAD_TYPE;
}

@Override
public String getJsonPayloadType()
{
return JSON_PAYLOAD_TYPE;
}

@Override
public String getSerialType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskInfoLite;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskAction;
Expand Down Expand Up @@ -186,6 +187,13 @@ public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId)
return handler.getTaskInfo(taskId);
}

@Nullable
@Override
public TaskInfoLite getTaskInfoLite(String taskId)
{
return handler.getTaskInfoLite(taskId);
}

@Override
public List<Task> getActiveTasks()
{
Expand Down Expand Up @@ -219,6 +227,14 @@ public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataS
);
}

@Override
public List<TaskInfoLite> getActiveTaskInfoLite(@Nullable String dataSource)
{
return ImmutableList.copyOf(
handler.getActiveTaskInfoLite(dataSource)
);
}

@Override
public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
@Nullable Integer maxTaskStatuses,
Expand All @@ -236,6 +252,23 @@ public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInf
);
}

@Override
public List<TaskInfoLite> getRecentlyCreatedAlreadyFinishedTaskInfoLite(
@Nullable Integer maxTaskStatuses,
@Nullable Duration durationBeforeNow,
@Nullable String datasource
)
{
return ImmutableList.copyOf(
handler.getCompletedTaskInfoLite(
DateTimes.nowUtc()
.minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
maxTaskStatuses,
datasource
)
);
}

@Override
public void addLock(final String taskid, final TaskLock taskLock)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Optional;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskInfoLite;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskAction;
Expand All @@ -29,6 +30,7 @@
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;

public interface TaskStorage
Expand Down Expand Up @@ -109,6 +111,11 @@ public interface TaskStorage
@Nullable
TaskInfo<Task, TaskStatus> getTaskInfo(String taskId);

@Nullable
default TaskInfoLite getTaskInfoLite(String taskId) {
return null;
}

/**
* Add an action taken by a task to the audit log.
*
Expand Down Expand Up @@ -155,6 +162,10 @@ public interface TaskStorage
*/
List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource);

default List<TaskInfoLite> getActiveTaskInfoLite(@Nullable String dataSource) {
return Collections.emptyList();
}

/**
* Returns up to {@code maxTaskStatuses} {@link TaskInfo} objects of recently finished tasks as stored in the storage
* facility. No particular order is guaranteed, but implementations are encouraged to return tasks in descending order
Expand All @@ -173,6 +184,14 @@ List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
@Nullable String datasource
);

default List<TaskInfoLite> getRecentlyCreatedAlreadyFinishedTaskInfoLite(
@Nullable Integer maxTaskStatuses,
@Nullable Duration durationBeforeNow,
@Nullable String datasource
) {
return Collections.emptyList();
}

/**
* Returns a list of locks for a particular task.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskInfoLite;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
Expand Down Expand Up @@ -78,6 +79,19 @@ public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataS
return storage.getActiveTaskInfo(dataSource);
}

public List<TaskInfoLite> getActiveTaskInfoLite(@Nullable String dataSource)
{
return storage.getActiveTaskInfoLite(dataSource);
}
public List<TaskInfoLite> getCompletedTaskInfoByCreatedTimeDurationLite(
@Nullable Integer maxTaskStatuses,
@Nullable Duration duration,
@Nullable String dataSource
)
{
return storage.getRecentlyCreatedAlreadyFinishedTaskInfoLite(maxTaskStatuses, duration, dataSource);
}

public List<TaskInfo<Task, TaskStatus>> getCompletedTaskInfoByCreatedTimeDuration(
@Nullable Integer maxTaskStatuses,
@Nullable Duration duration,
Expand All @@ -103,6 +117,12 @@ public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId)
return storage.getTaskInfo(taskId);
}

@Nullable
public TaskInfoLite getTaskInfoLite(String taskId)
{
return storage.getTaskInfoLite(taskId);
}

/**
* Returns all segments created by this task.
*
Expand Down
Loading