Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1f32283
Implementation of prioritized locking
jihoonson Jul 15, 2017
df93349
Fix build failure
jihoonson Jul 15, 2017
eecc391
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Jul 17, 2017
14bfaa4
Fix tc fail
jihoonson Jul 18, 2017
c57a9c5
Fix typos
jihoonson Jul 21, 2017
c17493f
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Jul 22, 2017
17fbcea
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Jul 25, 2017
700a968
Fix IndexTaskTest
jihoonson Jul 25, 2017
1dc9941
Addressed comments
jihoonson Jul 27, 2017
2eeaec5
Fix test
jihoonson Jul 27, 2017
6a33824
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 6, 2017
2c394a2
Fix spacing
jihoonson Aug 6, 2017
66bbc54
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 9, 2017
332c6e3
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 15, 2017
f5550dc
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 17, 2017
7ef3774
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Aug 21, 2017
db3195f
Fix build error
jihoonson Aug 22, 2017
51b6962
Fix build error
jihoonson Aug 22, 2017
c5576d7
Add lock status
jihoonson Aug 29, 2017
a9ceace
Cleanup suspicious method
jihoonson Aug 29, 2017
c04bc36
Add nullables
jihoonson Aug 29, 2017
7ad8720
add doInCriticalSection to TaskLockBox and revert return type of tas…
jihoonson Sep 7, 2017
07ace37
fix build
jihoonson Sep 7, 2017
4389f7f
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Sep 27, 2017
42ab8f7
refactor CriticalAction
jihoonson Oct 11, 2017
0ca6c89
make replaceLock transactional
jihoonson Oct 11, 2017
05078b7
Merge branch 'master' of https://github.com/druid-io/druid into prior…
jihoonson Oct 11, 2017
507c6ba
fix formatting
jihoonson Oct 11, 2017
5a29ee1
fix javadoc
jihoonson Oct 11, 2017
4586f24
fix build
jihoonson Oct 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
package io.druid.metadata;

import com.google.common.base.Optional;

import io.druid.java.util.common.Pair;

import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand All @@ -43,7 +41,7 @@ public interface MetadataStorageActionHandler<EntryType, StatusType, LogType, Lo
* @param status status object associated wit this object, can be null
* @throws EntryExistsException
*/
public void insert(
void insert(
@NotNull String id,
@NotNull DateTime timestamp,
@NotNull String dataSource,
Expand All @@ -62,38 +60,38 @@ public void insert(
* @param status status
* @return true if the status was updated, false if the entry did not exist of if the entry was inactive
*/
public boolean setStatus(String entryId, boolean active, StatusType status);
boolean setStatus(String entryId, boolean active, StatusType status);

/**
* Retrieves the entry with the given id.
*
* @param entryId entry id
* @return optional entry, absent if the given id does not exist
*/
public Optional<EntryType> getEntry(String entryId);
Optional<EntryType> getEntry(String entryId);

/**
* Retrieve the status for the entry with the given id.
*
* @param entryId entry id
* @return optional status, absent if entry does not exist or status is not set
*/
public Optional<StatusType> getStatus(String entryId);
Optional<StatusType> getStatus(String entryId);

/**
* Return all active entries with their respective status
*
* @return list of (entry, status) pairs
*/
public List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus();
List<Pair<EntryType, StatusType>> getActiveEntriesWithStatus();

/**
* Return all statuses for inactive entries created on or later than the given timestamp
*
* @param timestamp timestamp
* @return list of statuses
*/
public List<StatusType> getInactiveStatusesSince(DateTime timestamp);
List<StatusType> getInactiveStatusesSince(DateTime timestamp);

/**
* Add a lock to the given entry
Expand All @@ -102,14 +100,25 @@ public void insert(
* @param lock lock to add
* @return true if the lock was added
*/
public boolean addLock(String entryId, LockType lock);
boolean addLock(String entryId, LockType lock);

/**
* Replace an existing lock with a new lock.
*
* @param entryId entry id
* @param oldLockId lock to be replaced
* @param newLock lock to be added
*
* @return true if the lock is replaced
*/
boolean replaceLock(String entryId, long oldLockId, LockType newLock);

/**
* Remove the lock with the given lock id.
*
* @param lockId lock id
*/
public void removeLock(long lockId);
void removeLock(long lockId);

/**
* Add a log to the entry with the given id.
Expand All @@ -118,21 +127,29 @@ public void insert(
* @param log log to add
* @return true if the log was added
*/
public boolean addLog(String entryId, LogType log);
boolean addLog(String entryId, LogType log);

/**
* Returns the logs for the entry with the given id.
*
* @param entryId entry id
* @return list of logs
*/
public List<LogType> getLogs(String entryId);
List<LogType> getLogs(String entryId);

/**
* Returns the locks for the given entry
*
* @param entryId entry id
* @return map of lockId to lock
*/
public Map<Long, LockType> getLocks(String entryId);
Map<Long, LockType> getLocks(String entryId);

/**
* Returns the lock id for the given entry and the lock.
*
* @return lock id if found. Otherwise null.
*/
@Nullable
Long getLockId(String entryId, LockType lock);
}
39 changes: 36 additions & 3 deletions docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,33 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|property|description|required?|
|--------|-----------|---------|
|type|The task type, this should always be "index".|yes|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using task type, data source name, interval, and date-time stamp. |no|
|spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes|
|context|Context containing various task configuration parameters. See below for more details.|no|

#### Task Priority

Druid's indexing tasks use locks for atomic data ingestion. Each lock is acquired for the combination of a dataSource and an interval. Once a task acquires a lock, it can write data for the dataSource and the interval of the acquired lock unless the lock is released or preempted. Please see [the below Locking section](#locking)

Each task has a priority which is used for lock acquisition. The locks of higher-priority tasks can preempt the locks of lower-priority tasks if they try to acquire for the same dataSource and interval. If some locks of a task are preempted, the behavior of the preempted task depends on the task implementation. Usually, most tasks finish as failed if they are preempted.

Tasks can have different default priorities depening on their types. Here are a list of default priorities. Higher the number, higher the priority.

|task type|default priority|
|---------|----------------|
|Realtime index task|75|
|Batch index task|50|
|Merge/Append task|25|
|Other tasks|0|

You can override the task priority by setting your priority in the task context like below.

```json
"context" : {
"priority" : 100
}
```

#### DataSchema

This field is required.
Expand Down Expand Up @@ -322,7 +345,17 @@ These tasks start, sleep for a time and are used only for testing. The available
Locking
-------

Once an overlord node accepts a task, a lock is created for the data source and interval specified in the task.
Once an overlord node accepts a task, the task acquires locks for the data source and intervals specified in the task.

There are two lock types, i.e., _shared lock_ and _exclusive lock_.

- A task needs to acquire a shared lock before it reads segments of an interval. Multiple shared locks can be acquired for the same dataSource and interval. Shared locks are always preemptable, but they don't preempt each other.
- A task needs to acquire an exclusive lock before it writes segments for an interval. An exclusive lock is also preemptable except while the task is publishing segments.

Each task can have different lock priorities. The locks of higher-priority tasks can preempt the locks of lower-priority tasks. The lock preemption works based on _optimistic locking_. When a lock is preempted, it is not notified to the owner task immediately. Instead, it's notified when the owner task tries to acquire the same lock again. (Note that lock acquisition is idempotent unless the lock is preempted.) In general, tasks don't compete for acquiring locks because they usually targets different dataSources or intervals.

A task writing data into a dataSource must acquire exclusive locks for target intervals. Note that exclusive locks are still preemptable. That is, they also be able to be preempted by higher priority locks unless they are _publishing segments_ in a critical section. Once publishing segments is finished, those locks become preemptable again.

Tasks do not need to explicitly release locks, they are released upon task completion. Tasks may potentially release
locks early if they desire. Tasks ids are unique by naming them using UUIDs or the timestamp in which the task was created.
locks early if they desire. Task ids are unique by naming them using UUIDs or the timestamp in which the task was created.
Tasks are also part of a "task group", which is a set of tasks that can share interval locks.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
Expand Down Expand Up @@ -576,13 +577,12 @@ public void run()
sequenceNames.values()
).get();

final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published);
final SegmentsAndMetadata handedOff;
if (tuningConfig.getHandoffConditionTimeout() == 0) {
handedOff = driver.registerHandoff(published)
.get();
handedOff = handoffFuture.get();
} else {
handedOff = driver.registerHandoff(published)
.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
}

if (handedOff == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,73 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import org.joda.time.Interval;

import javax.annotation.Nullable;

/**
* Represents a lock held by some task. Immutable.
*/
public class TaskLock
{
private final TaskLockType type;
private final String groupId;
private final String dataSource;
private final Interval interval;
private final String version;
private final int priority;
private final boolean revoked;

@JsonCreator
public TaskLock(
@JsonProperty("type") @Nullable TaskLockType type, // nullable for backward compatibility
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version
@JsonProperty("version") String version,
@JsonProperty("priority") int priority,
@JsonProperty("revoked") boolean revoked
)
{
this.type = type == null ? TaskLockType.EXCLUSIVE : type;
this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Preconditions.checkNotNull(interval, "interval");
this.version = Preconditions.checkNotNull(version, "version");
this.priority = priority;
this.revoked = revoked;
}

public TaskLock(
TaskLockType type,
String groupId,
String dataSource,
Interval interval,
String version,
int priority
)
{
this.groupId = groupId;
this.dataSource = dataSource;
this.interval = interval;
this.version = version;
this(type, groupId, dataSource, interval, version, priority, false);
}

public TaskLock revokedCopy()
{
return new TaskLock(
type,
groupId,
dataSource,
interval,
version,
priority,
true
);
}

@JsonProperty
public TaskLockType getType()
{
return type;
}

@JsonProperty
Expand All @@ -72,34 +115,52 @@ public String getVersion()
return version;
}

@JsonProperty
public int getPriority()
{
return priority;
}

@JsonProperty
public boolean isRevoked()
{
return revoked;
}

@Override
public boolean equals(Object o)
{
if (!(o instanceof TaskLock)) {
return false;
} else {
final TaskLock x = (TaskLock) o;
return Objects.equal(this.groupId, x.groupId) &&
Objects.equal(this.dataSource, x.dataSource) &&
Objects.equal(this.interval, x.interval) &&
Objects.equal(this.version, x.version);
final TaskLock that = (TaskLock) o;
return this.type.equals(that.type) &&
this.groupId.equals(that.groupId) &&
this.dataSource.equals(that.dataSource) &&
this.interval.equals(that.interval) &&
this.version.equals(that.version) &&
this.priority == that.priority &&
this.revoked == that.revoked;
}
}

@Override
public int hashCode()
{
return Objects.hashCode(groupId, dataSource, interval, version);
return Objects.hashCode(type, groupId, dataSource, interval, version, priority, revoked);
}

@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("type", type)
.add("groupId", groupId)
.add("dataSource", dataSource)
.add("interval", interval)
.add("version", version)
.add("priority", priority)
.add("revoked", revoked)
.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.common;

public enum TaskLockType
{
SHARED,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any code that creates SHARED locks. Are they used?

Should they be used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they will be created in the next pr.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it.

EXCLUSIVE
}
Loading