Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ public void insert(
*/
public boolean addLock(String entryId, LockType lock);

/**
* Sets the lock with {@param lockId} to {@param lock}
*
* @param lockId lock id
* @return true if lock was set
* */
public boolean setLock(long lockId, LockType lock);

/**
* Remove the lock with the given lock id.
*
Expand Down
40 changes: 35 additions & 5 deletions docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,34 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|property|description|required?|
|--------|-----------|---------|
|type|The task type, this should always be "index".|yes|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, date-time stamp. |no|
|spec|The ingestion spec. See below for more details. |yes|

#### Task Priority

This is applicable only when this feature is enabled by setting `druid.indexer.taskLockboxVersion` to `v2`.
Task priority is used for acquiring a lock on an interval for a datasource.
Tasks with higher priority can preempt lower-priority tasks for the same datasource and interval if ran concurrently.
Priority order for different task types - Realtime Index Task > Hadoop/Index Task > Merge/Append Task > Other Tasks.
Tasks of same priority cannot preempt one another.
- Default lock priorities for task
- Realtime Index Task - 75
- Hadoop/Index Task - 50
- Merge/Append Task - 25
- Other Tasks - 0

Higher the number, higher the priority. Default priority can be overridden by setting context in task json like this -

```
"context" {
"lockPriority" : "80"
}
```

For example, if a Hadoop Index task is running and a Realtime Index task starts that wants to publish a segment for the
same (or overlapping) interval for the same datasource, then it will override the task locks of the Hadoop Index task.
Consequently, the Hadoop Index task will fail before publishing the segment.

#### DataSchema

This field is required.
Expand Down Expand Up @@ -270,8 +295,13 @@ These tasks start, sleep for a time and are used only for testing. The available

Locking
-------

Once an overlord node accepts a task, a lock is created for the data source and interval specified in the task.
Tasks do not need to explicitly release locks, they are released upon task completion. Tasks may potentially release
locks early if they desire. Tasks ids are unique by naming them using UUIDs or the timestamp in which the task was created.
Once an overlord node accepts a task, a priority based lock is created for the data source and interval specified in the task,
where priority is based on the task type. Tasks do not need to explicitly release locks, they are released upon task completion.
Tasks may potentially release locks early if they desire or their lock can be overridden by a high priority task.
Tasks ids are unique by naming them using UUIDs or the timestamp in which the task was created.
Tasks are also part of a "task group", which is a set of tasks that can share interval locks.
Before committing the work (publishing segments), tasks upgrade their lock, failing to do so will result in task failure.
The task will be able to upgrade if no other higher priority task came along and revoked its lock.
Upgraded lock indicates that this lock cannot be overridden and other tasks have to wait for lock release.
Note - In case Priority based locking is disabled (i.e. `druid.indexer.taskLockboxVersion` is not set to `v2`) then all
tasks will have the default priority of 0 and lock is created only for the data source and interval specified in the task.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.SetLockCriticalStateAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskLockCriticalState;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.query.DruidMetrics;
Expand Down Expand Up @@ -231,6 +233,12 @@ public KafkaIOConfig getIOConfig()
return ioConfig;
}

@Override
public int getLockPriority()
{
return getLockPriority(REALTIME_TASK_PRIORITY);
}

@Override
public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
Expand Down Expand Up @@ -483,6 +491,12 @@ public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata)
throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
}

// Upgrade TaskLocks for all segments
for (DataSegment segment: segments) {
toolbox.getTaskActionClient().submit(new SetLockCriticalStateAction(segment.getInterval(),
TaskLockCriticalState.UPGRADE));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm Is this a reasonable assumption that there will be a corresponding TaskLock with same interval for each segment being published by KafkaIndexTask ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm can you please look at this comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pjain1 Do you mean the exact same interval or just one that contains it? It will have a lock that contains the segment interval, but it might not be the exact same interval.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be somewhat larger I mean.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant exact same interval, I search for a TaskLockPosse for this task having the same interval. Doesn't kafkaIndexTask will acquire lock as it announces the segments ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does acquire locks as it announces, although it's possible that it will announce a segment under a "wider" lock (like announce an hourly segment under a daily lock)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see it uses SegmentAllocateAction I can change the check to contains instead of equalsand that should take care of this case hopefully

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

final SegmentTransactionalInsertAction action;

if (ioConfig.isUseTransaction()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataTaskStorage;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskLockboxV1;
import io.druid.indexing.overlord.TaskLockboxV2;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.test.TestDataSegmentAnnouncer;
import io.druid.indexing.test.TestDataSegmentKiller;
Expand Down Expand Up @@ -158,8 +160,11 @@ public class KafkaIndexTaskTest
private TaskStorage taskStorage;
private TaskLockbox taskLockbox;
private File directory;
private String taskLockboxVersion;

private final List<Task> runningTasks = Lists.newArrayList();
private static final String TASKLOCKBOX_V1 = "v1";
private static final String TASKLOCKBOX_V2 = "v2";

private static final Logger log = new Logger(KafkaIndexTaskTest.class);
private static final ObjectMapper objectMapper = new DefaultObjectMapper();
Expand Down Expand Up @@ -203,15 +208,16 @@ public class KafkaIndexTaskTest
);
}

@Parameterized.Parameters(name = "buildV9Directly = {0}")
@Parameterized.Parameters(name = "buildV9Directly = {0}, taskLockBoxVersion={1}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{true}, new Object[]{false});
return ImmutableList.of(new Object[]{true, TASKLOCKBOX_V1}, new Object[]{false, TASKLOCKBOX_V2});
}

public KafkaIndexTaskTest(boolean buildV9Directly)
public KafkaIndexTaskTest(boolean buildV9Directly, String taskLockboxVersion)
{
this.buildV9Directly = buildV9Directly;
this.taskLockboxVersion = taskLockboxVersion;
}

@Rule
Expand Down Expand Up @@ -1350,7 +1356,11 @@ private void makeToolboxFactory() throws IOException
derby.metadataTablesConfigSupplier().get(),
derbyConnector
);
taskLockbox = new TaskLockbox(taskStorage);
if (taskLockboxVersion.equals(TASKLOCKBOX_V2)) {
taskLockbox = new TaskLockboxV2(taskStorage);
} else {
taskLockbox = new TaskLockboxV1(taskStorage);
}
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
metadataStorageCoordinator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,45 @@

/**
* Represents a lock held by some task. Immutable.
*
*/
public class TaskLock
{
/**
* Represents the groupdId for the lock, tasks having same groupdId can share TaskLock
* */
private final String groupId;
private final String dataSource;
private final Interval interval;
/**
* This version will be used to publish the segments
* */
private final String version;
/**
* Priority used for acquiring the lock, value depends on the task type
* */
private final int priority;
/**
* If false this lock can be revoked by a higher priority TaskLock otherwise not
* */
private final boolean upgraded;

@JsonCreator
public TaskLock(
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version
@JsonProperty("version") String version,
@JsonProperty("priority") int priority,
@JsonProperty("upgraded") boolean upgraded
)
{
this.groupId = groupId;
this.dataSource = dataSource;
this.interval = interval;
this.version = version;
this.priority = priority;
this.upgraded = upgraded;
}

@JsonProperty
Expand All @@ -72,6 +91,29 @@ public String getVersion()
return version;
}

@JsonProperty
public int getPriority()
{
return priority;
}

@JsonProperty
public boolean isUpgraded()
{
return upgraded;
}

public TaskLock withUpgraded(boolean upgraded) {
return new TaskLock(
getGroupId(),
getDataSource(),
getInterval(),
getVersion(),
getPriority(),
upgraded
);
}

@Override
public boolean equals(Object o)
{
Expand All @@ -82,14 +124,16 @@ public boolean equals(Object o)
return Objects.equal(this.groupId, x.groupId) &&
Objects.equal(this.dataSource, x.dataSource) &&
Objects.equal(this.interval, x.interval) &&
Objects.equal(this.version, x.version);
Objects.equal(this.version, x.version) &&
Objects.equal(this.priority, x.priority) &&
Objects.equal(this.upgraded, x.upgraded); // added priority and upgraded to equals check
}
}

@Override
public int hashCode()
{
return Objects.hashCode(groupId, dataSource, interval, version);
return Objects.hashCode(groupId, dataSource, interval, version, priority, upgraded);
}

@Override
Expand All @@ -100,6 +144,8 @@ public String toString()
.add("dataSource", dataSource)
.add("interval", interval)
.add("version", version)
.add("priority", priority)
.add("upgraded", upgraded)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOExcepti
// Want to retry, so throw an IOException.
throw new IOException(
String.format(
"Scary HTTP status returned: %s. Check your overlord[%s] logs for exceptions.",
"Scary HTTP status returned: %s. Check your overlord [%s] logs for exceptions.",
response.getStatus(),
server.getHost()
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.common.actions;


import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import io.druid.indexing.common.task.Task;
import org.joda.time.Interval;

import java.io.IOException;

public class SetLockCriticalStateAction implements TaskAction<Boolean> {

@JsonIgnore
private final Interval interval;
@JsonIgnore
private final TaskLockCriticalState taskLockCriticalState;

@JsonCreator
public SetLockCriticalStateAction(
@JsonProperty("interval") Interval interval,
@JsonProperty("taskLockCriticalState") TaskLockCriticalState taskLockCriticalState
)
{
this.taskLockCriticalState = taskLockCriticalState;
this.interval = interval;
}

@JsonProperty
public Interval getInterval()
{
return interval;
}

@JsonProperty
public TaskLockCriticalState getTaskLockCriticalState()
{
return taskLockCriticalState;
}

@Override
public TypeReference<Boolean> getReturnTypeReference()
{
return new TypeReference<Boolean>()
{
};
}

@Override
public Boolean perform(
Task task, TaskActionToolbox toolbox
) throws IOException
{
return toolbox.getTaskLockbox().setTaskLockCriticalState(task, interval, taskLockCriticalState);
}

@Override
public boolean isAudited()
{
return false;
}

@Override
public String toString(){
return "SetLockCriticalStateAction{ " +
"Interval = " + interval + ", TaskLockCriticalState = " + taskLockCriticalState +
" }";
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

SetLockCriticalStateAction that = (SetLockCriticalStateAction) o;

if (!getInterval().equals(that.getInterval())) {
return false;
}
return getTaskLockCriticalState() == that.getTaskLockCriticalState();

}

@Override
public int hashCode()
{
int result = getInterval().hashCode();
result = 31 * result + getTaskLockCriticalState().hashCode();
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
@JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
@JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class),
@JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class)
@JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class),
@JsonSubTypes.Type(name = "lockCriticalState", value = SetLockCriticalStateAction.class)
})
public interface TaskAction<RetType>
{
Expand Down
Loading