Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
40e479e
Optimize overlord GET /tasks memory usage
AmatyaAvadhanula Apr 6, 2022
98fca66
Use extra columns for better perf
AmatyaAvadhanula Apr 8, 2022
bf8e8c5
Migration in thread, determining strategy for sql fetch
AmatyaAvadhanula Apr 13, 2022
c784c3a
Fix existing unit tests
AmatyaAvadhanula Apr 14, 2022
d0d7f12
Merge remote-tracking branch 'upstream/master' into feature-optimize_…
AmatyaAvadhanula Apr 18, 2022
74fac09
SQLMetadataConnector migration fix and tests
AmatyaAvadhanula Apr 19, 2022
c436ba5
Refactoring
AmatyaAvadhanula Apr 20, 2022
71b2a4a
Add test
AmatyaAvadhanula Apr 20, 2022
843a32d
Fix bug, refactor etc
AmatyaAvadhanula Apr 21, 2022
4807b13
trivial tests for coverage
AmatyaAvadhanula Apr 21, 2022
07ead45
Merge remote-tracking branch 'upstream/master' into feature-optimize_…
AmatyaAvadhanula Apr 22, 2022
082d0c3
Possible fix for integration tests
AmatyaAvadhanula Apr 25, 2022
3a91ce8
Merge remote-tracking branch 'upstream/master' into feature-optimize_…
AmatyaAvadhanula Apr 26, 2022
731caab
Merge remote-tracking branch 'upstream/master' into feature-optimize_…
AmatyaAvadhanula Apr 26, 2022
a5bafe7
Cleanup OverlordResourceTest
kfaraz Apr 28, 2022
e2b9e93
Address review
AmatyaAvadhanula Apr 28, 2022
cb919a3
Merge remote-tracking branch 'origin/feature-optimize_memory_getTasks…
AmatyaAvadhanula Apr 28, 2022
48e8a89
Address review
AmatyaAvadhanula May 2, 2022
127fc88
Merge remote-tracking branch 'upstream/master' into feature-optimize_…
AmatyaAvadhanula Jun 15, 2022
402dd12
Incorporate feedback
AmatyaAvadhanula Jun 15, 2022
aa82787
Refactor for cleaner API
AmatyaAvadhanula Jun 16, 2022
f1c1151
migration at startup and fix checkstyle
AmatyaAvadhanula Jun 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions core/src/main/java/org/apache/druid/indexer/TaskIdentifier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexer;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* Model class containing the id, type and groupId of a task
* These fields are extracted from the task payload for the new schema and this model can be used for migration as well.
*/
public class TaskIdentifier
{

private final String id;

@Nullable
private final String type;

@Nullable
private final String groupId;

@JsonCreator
public TaskIdentifier(
@JsonProperty("id") String id,
@JsonProperty("groupId") @Nullable String groupId,
@JsonProperty("type") @Nullable String type // nullable for backward compatibility
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = groupId;
this.type = type;
}

@JsonProperty
public String getId()
{
return id;
}

@Nullable
@JsonProperty
public String getGroupId()
{
return groupId;
}

@Nullable
@JsonProperty
public String getType()
{
return type;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskIdentifier that = (TaskIdentifier) o;
return Objects.equals(getId(), that.getId()) &&
Objects.equals(getGroupId(), that.getGroupId()) &&
Objects.equals(getType(), that.getType());
}

@Override
public int hashCode()
{
return Objects.hash(
getId(),
getGroupId(),
getType()
);
}

@Override
public String toString()
{
return "TaskIdentifier{" +
"id='" + id + '\'' +
", groupId='" + groupId + '\'' +
", type='" + type + '\'' +
'}';
}
}
27 changes: 27 additions & 0 deletions core/src/main/java/org/apache/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.RE;
import org.joda.time.DateTime;

Expand All @@ -31,6 +32,7 @@
public class TaskStatusPlus
{
private final String id;
@Nullable
private final String type;
private final DateTime createdTime;
private final DateTime queueInsertionTime;
Expand Down Expand Up @@ -252,4 +254,29 @@ public String toString()
", errorMsg='" + errorMsg + '\'' +
'}';
}

/**
* Convert a TaskInfo class of TaskIdentifier and TaskStatus to a TaskStatusPlus
* Applicable only for completed or waiting tasks since a TaskInfo doesn't have the exhaustive info for running tasks
*
* @param taskIdentifierInfo TaskInfo pair
* @return corresponding TaskStatusPlus
*/
public static TaskStatusPlus fromTaskIdentifierInfo(TaskInfo<TaskIdentifier, TaskStatus> taskIdentifierInfo)
{
TaskStatus status = taskIdentifierInfo.getStatus();
return new TaskStatusPlus(
taskIdentifierInfo.getId(),
taskIdentifierInfo.getTask().getGroupId(),
taskIdentifierInfo.getTask().getType(),
taskIdentifierInfo.getCreatedTime(),
DateTimes.EPOCH,
status.getStatusCode(),
status.getStatusCode().isComplete() ? RunnerTaskState.NONE : RunnerTaskState.WAITING,
status.getDuration(),
status.getLocation(),
taskIdentifierInfo.getDataSource(),
status.getErrorMsg()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.metadata;

import com.google.common.base.Optional;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
import org.joda.time.DateTime;
Expand All @@ -41,6 +42,8 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param entry object representing this entry
* @param active active or inactive flag
* @param status status object associated wit this object, can be null
* @param type entry type
* @param groupId entry group id
* @throws EntryExistsException
*/
void insert(
Expand All @@ -49,10 +52,11 @@ void insert(
@NotNull String dataSource,
@NotNull EntryType entry,
boolean active,
@Nullable StatusType status
@Nullable StatusType status,
@NotNull String type,
@NotNull String groupId
Comment on lines +56 to +57
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These arguments should probably come earlier, maybe right after id.

) throws EntryExistsException;


/**
* Sets or updates the status for any active entry with the given id.
* Once an entry has been set inactive, its status cannot be updated anymore.
Expand Down Expand Up @@ -99,6 +103,22 @@ List<TaskInfo<EntryType, StatusType>> getTaskInfos(
@Nullable String datasource
);

/**
* Returns the statuses of the specified tasks.
*
* If {@code taskLookups} includes {@link TaskLookupType#ACTIVE}, it returns all active tasks in the metadata store.
* If {@code taskLookups} includes {@link TaskLookupType#COMPLETE}, it returns all complete tasks in the metadata
* store. For complete tasks, additional filters in {@code CompleteTaskLookup} can be applied.
* All lookups should be processed atomically if more than one lookup is given.
*
* @param taskLookups task lookup type and filters.
* @param datasource datasource filter
*/
List<TaskInfo<TaskIdentifier, StatusType>> getTaskStatusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
);

default List<TaskInfo<EntryType, StatusType>> getTaskInfos(
TaskLookup taskLookup,
@Nullable String datasource
Expand Down Expand Up @@ -173,4 +193,10 @@ default List<TaskInfo<EntryType, StatusType>> getTaskInfos(
*/
@Nullable
Long getLockId(String entryId, LockType lock);

/**
* Utility to migrate existing tasks to the new schema by populating type and groupId asynchronously
*/
void populateTaskTypeAndGroupIdAsync();

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -212,6 +213,12 @@ public boolean tableExists(final Handle handle, final String tableName)
.isEmpty();
}

@Override
public String limitClause(int limit)
{
return String.format(Locale.ENGLISH, "FETCH NEXT %d ROWS ONLY", limit);
}

/**
*
* {@inheritDoc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,15 @@ public void testIsTransientException()
Assert.assertFalse(connector.isTransientException(new Throwable("Throwable with reason only")));
}

@Test
public void testLimitClause()
{
SQLServerConnector connector = new SQLServerConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null)
)
);
Assert.assertEquals("FETCH NEXT 100 ROWS ONLY", connector.limitClause(100));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import javax.annotation.Nullable;
import java.io.File;
import java.sql.SQLException;
import java.util.Locale;

public class MySQLConnector extends SQLMetadataConnector
{
Expand Down Expand Up @@ -177,6 +178,12 @@ public int getStreamingFetchSize()
return Integer.MIN_VALUE;
}

@Override
public String limitClause(int limit)
{
return String.format(Locale.ENGLISH, "LIMIT %d", limit);
}

@Override
public boolean tableExists(Handle handle, String tableName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,16 @@ public void testIsExceptionTransientNoMySqlClazz()
connector.connectorIsTransientException(new SQLTransientConnectionException("transient"))
);
}

@Test
public void testLimitClause()
{
MySQLConnector connector = new MySQLConnector(
CONNECTOR_CONFIG_SUPPLIER,
TABLES_CONFIG_SUPPLIER,
new MySQLConnectorSslConfig(),
MYSQL_DRIVER_CONFIG
);
Assert.assertEquals("LIMIT 100", connector.limitClause(100));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.List;
import java.util.Locale;

public class PostgreSQLConnector extends SQLMetadataConnector
{
Expand Down Expand Up @@ -144,6 +145,12 @@ public int getStreamingFetchSize()
return DEFAULT_STREAMING_RESULT_SIZE;
}

@Override
public String limitClause(int limit)
{
return String.format(Locale.ENGLISH, "LIMIT %d", limit);
}

protected boolean canUpsert(Handle handle) throws SQLException
{
if (canUpsert == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,18 @@ public void testIsTransientException()
Assert.assertFalse(connector.isTransientException(new Exception("I'm not happy")));
Assert.assertFalse(connector.isTransientException(new Throwable("I give up")));
}

@Test
public void testLimitClause()
{
PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null)
),
new PostgreSQLConnectorConfig(),
new PostgreSQLTablesConfig()
);
Assert.assertEquals("LIMIT 100", connector.limitClause(100));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -241,4 +243,20 @@ default <ContextValueType> ContextValueType getContextValue(String key, ContextV
final ContextValueType value = getContextValue(key);
return value == null ? defaultValue : value;
}

default TaskIdentifier getMetadata()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rename to getTaskIdentifier.

{
return new TaskIdentifier(this.getId(), this.getGroupId(), this.getType());
}

static TaskInfo<TaskIdentifier, TaskStatus> toTaskIdentifierInfo(TaskInfo<Task, TaskStatus> taskInfo)
{
return new TaskInfo<>(
taskInfo.getId(),
taskInfo.getCreatedTime(),
taskInfo.getStatus(),
taskInfo.getDataSource(),
taskInfo.getTask().getMetadata()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
Expand Down Expand Up @@ -233,6 +234,18 @@ public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
return tasks;
}

@Override
public List<TaskStatusPlus> getTaskStatusPlusList(
Map<TaskLookupType, TaskLookup> taskLookups,
@Nullable String datasource
)
{
return getTaskInfos(taskLookups, datasource).stream()
.map(Task::toTaskIdentifierInfo)
.map(TaskStatusPlus::fromTaskIdentifierInfo)
.collect(Collectors.toList());
}

private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfoSince(
DateTime start,
@Nullable Integer n,
Expand Down
Loading