Skip to content
Merged
16 changes: 15 additions & 1 deletion docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|--------|-----------|---------|
|type|The task type, this should always be "index".|yes|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no|
|spec|The ingestion spec. See below for more details. |yes|
|spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes|
|context|Context containing various task configuration parameters. See below for more details.|no|

#### DataSchema

Expand Down Expand Up @@ -160,6 +161,19 @@ On the contrary, in the incremental publishing mode, segments are incrementally

To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig.

### Task Context

The task context is used for various task configuration parameters. The following parameters apply to all tasks.

|property|default|description|
|--------|-------|-----------|
|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).|

<div class="note caution">
When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result.
As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords.
</div>

Segment Merging Tasks
---------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@ private void makeToolboxFactory() throws IOException
derby.metadataTablesConfigSupplier().get(),
derbyConnector
);
taskLockbox = new TaskLockbox(taskStorage, 3000);
taskLockbox = new TaskLockbox(taskStorage);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
metadataStorageCoordinator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ public boolean isUseExplicitVersion()
return useExplicitVersion;
}

@JsonProperty("allowedHadoopPrefix")
public List<String> getUserAllowedHadoopPrefix()
{
// Just the user-specified list. More are added in HadoopDruidIndexerConfig.
return allowedHadoopPrefix;
}

public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
Expand Down Expand Up @@ -320,11 +327,4 @@ public HadoopTuningConfig withShardSpecs(Map<Long, List<HadoopyShardSpec>> specs
allowedHadoopPrefix
);
}

@JsonProperty("allowedHadoopPrefix")
public List<String> getUserAllowedHadoopPrefix()
{
// Just the user-specified list. More are added in HadoopDruidIndexerConfig.
return allowedHadoopPrefix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,17 @@ public class LockAcquireAction implements TaskAction<TaskLock>
@JsonIgnore
private final Interval interval;

@JsonIgnore
private final long timeoutMs;

@JsonCreator
public LockAcquireAction(
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("timeoutMs") long timeoutMs
)
{
this.interval = interval;
this.timeoutMs = timeoutMs;
}

@JsonProperty
Expand All @@ -47,6 +52,12 @@ public Interval getInterval()
return interval;
}

@JsonProperty
public long getTimeoutMs()
{
return timeoutMs;
}

@Override
public TypeReference<TaskLock> getReturnTypeReference()
{
Expand All @@ -59,7 +70,11 @@ public TypeReference<TaskLock> getReturnTypeReference()
public TaskLock perform(Task task, TaskActionToolbox toolbox)
{
try {
return toolbox.getTaskLockbox().lock(task, interval);
if (timeoutMs == 0) {
return toolbox.getTaskLockbox().lock(task, interval);
} else {
return toolbox.getTaskLockbox().lock(task, interval, timeoutMs);
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
Expand All @@ -77,6 +92,7 @@ public String toString()
{
return "LockAcquireAction{" +
"interval=" + interval +
"timeoutMs=" + timeoutMs +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;

import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
Expand Down Expand Up @@ -198,7 +197,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
)
);
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs));
version = lock.getVersion();
} else {
Iterable<TaskLock> locks = getTaskLocks(toolbox);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
final DataSchema dataSchema;
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(shardSpecs.getIntervals());
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs));
version = lock.getVersion();
dataSchema = ingestionSchema.getDataSchema().withGranularitySpec(
ingestionSchema.getDataSchema()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,16 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
// which will typically be either the main data processing loop or the persist thread.

// Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http timeout error can occur while waiting for a
// lock to be acquired.
final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
{
@Override
public void announceSegment(final DataSegment segment) throws IOException
{
// Side effect: Calling announceSegment causes a lock to be acquired
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs));
toolbox.getSegmentAnnouncer().announceSegment(segment);
}

Expand All @@ -233,7 +236,7 @@ public void announceSegments(Iterable<DataSegment> segments) throws IOException
{
// Side effect: Calling announceSegments causes locks to be acquired
for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs));
}
toolbox.getSegmentAnnouncer().announceSegments(segments);
}
Expand Down Expand Up @@ -266,7 +269,7 @@ public String getVersion(final Interval interval)
try {
// Side effect: Calling getVersion causes a lock to be acquired
final TaskLock myLock = toolbox.getTaskActionClient()
.submit(new LockAcquireAction(interval));
.submit(new LockAcquireAction(interval, lockTimeoutMs));

return myLock.getVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.druid.query.Query;
import io.druid.query.QueryRunner;

import javax.annotation.Nullable;
import java.util.Map;

/**
Expand Down Expand Up @@ -166,6 +167,15 @@ public interface Task

public Map<String, Object> getContext();

public Object getContextValue(String key);

@Nullable
default <ContextValueType> ContextValueType getContextValue(String key)
{
return getContext() == null ? null : (ContextValueType) getContext().get(key);
}

default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
{
final ContextValueType value = getContextValue(key);
return value == null ? defaultValue : value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.common.task;

public class Tasks
{
public static String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.server.initialization.ServerConfig;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand Down Expand Up @@ -69,7 +68,6 @@ public class TaskLockbox
private final TaskStorage taskStorage;
private final ReentrantLock giant = new ReentrantLock(true);
private final Condition lockReleaseCondition = giant.newCondition();
protected final long lockTimeoutMillis;

private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);

Expand All @@ -79,21 +77,10 @@ public class TaskLockbox

@Inject
public TaskLockbox(
TaskStorage taskStorage,
ServerConfig serverConfig
TaskStorage taskStorage
)
{
this.taskStorage = taskStorage;
this.lockTimeoutMillis = serverConfig.getMaxIdleTime().getMillis();
}

public TaskLockbox(
TaskStorage taskStorage,
long lockTimeoutMillis
)
{
this.taskStorage = taskStorage;
this.lockTimeoutMillis = lockTimeoutMillis;
}

/**
Expand Down Expand Up @@ -140,7 +127,7 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
continue;
}

final TaskLockPosse taskLockPosse = tryAddTaskToLockPosse(
final TaskLockPosse taskLockPosse = createOrFindLockPosse(
task,
savedTaskLock.getInterval(),
Optional.of(savedTaskLock.getVersion())
Expand Down Expand Up @@ -190,44 +177,52 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
}

/**
* Acquires a lock on behalf of a task. Blocks until the lock is acquired. Throws an exception if the lock
* cannot be acquired.
* Acquires a lock on behalf of a task. Blocks until the lock is acquired.
*
* @param task task to acquire lock for
* @param interval interval to lock
* @return acquired TaskLock
*
* @throws InterruptedException if the lock cannot be acquired
* @throws InterruptedException if the current thread is interrupted
*/
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
{
long timeout = lockTimeoutMillis;
giant.lock();
giant.lockInterruptibly();
try {
Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) {
long startTime = System.currentTimeMillis();
lockReleaseCondition.await(timeout, TimeUnit.MILLISECONDS);
long timeDelta = System.currentTimeMillis() - startTime;
if (timeDelta >= timeout) {
log.error(
"Task [%s] can not acquire lock for interval [%s] within [%s] ms",
task.getId(),
interval,
lockTimeoutMillis
);
lockReleaseCondition.await();
}
return taskLock.get();
}
finally {
giant.unlock();
}
}

throw new InterruptedException(String.format(
"Task [%s] can not acquire lock for interval [%s] within [%s] ms",
task.getId(),
interval,
lockTimeoutMillis
));
} else {
timeout -= timeDelta;
/**
* Acquires a lock on behalf of a task, waiting up to the specified wait time if necessary.
*
* @param task task to acquire a lock for
* @param interval interval to lock
* @param timeoutMs maximum time to wait
*
* @return acquired lock
*
* @throws InterruptedException if the current thread is interrupted
*/
public TaskLock lock(final Task task, final Interval interval, long timeoutMs) throws InterruptedException
{
long nanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, actual timeout should be min(timeoutMs, serverConfig.getMaxIdleTime().getMillis()), say if a task gets lock after 3 minutes and serverConfig.getMaxIdleTime().getMillis() is 2 minutes then taskAquireAction can not write the response in the out channel even after getting the lock.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the http timeout waiting for a lock is actually our bug. And your suggestion sounds a workaround to avoid this bug.
I think there is no reason the maxIdleTime and taskLockTimeout should be associated except the bug. If so, is it better to fix the bug rather than adding a workaround and telling users to learn this workaround? I think adding a caveat for this bug to the doc would be enough for now.

@akashdw @leventov @gianm any thoughts?

Copy link
Copy Markdown
Contributor

@akashdw akashdw Jul 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson sorry for the delay in response. I totally agree with you on maxIdleTime and taskLockTimeout should not be associated. May be you can write a comment that timeout passed might not work as expected if server idle time is less than taskLockTimeout.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akashdw thanks for your understanding. I added some caveats.

giant.lockInterruptibly();
try {
Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) {
if (nanos <= 0) {
return null;
}
nanos = lockReleaseCondition.awaitNanos(nanos);
}

return taskLock.get();
}
finally {
Expand Down Expand Up @@ -274,7 +269,7 @@ private Optional<TaskLock> tryLock(final Task task, final Interval interval, fin
}
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");

final TaskLockPosse posseToUse = tryAddTaskToLockPosse(task, interval, preferredVersion);
final TaskLockPosse posseToUse = createOrFindLockPosse(task, interval, preferredVersion);
if (posseToUse != null) {
// Add to existing TaskLockPosse, if necessary
if (posseToUse.getTaskIds().add(task.getId())) {
Expand Down Expand Up @@ -310,7 +305,7 @@ private Optional<TaskLock> tryLock(final Task task, final Interval interval, fin

}

private TaskLockPosse tryAddTaskToLockPosse(
private TaskLockPosse createOrFindLockPosse(
final Task task,
final Interval interval,
final Optional<String> preferredVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testSimple() throws Exception
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000);
action.perform(task, actionTestKit.getTaskActionToolbox());

Assert.assertEquals(
Expand All @@ -108,7 +108,7 @@ public void testFailBadVersion() throws Exception
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000);

thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task"));
Expand Down
Loading