Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1491,7 +1491,7 @@ private void makeToolboxFactory() throws IOException
derby.metadataTablesConfigSupplier().get(),
derbyConnector
);
taskLockbox = new TaskLockbox(taskStorage);
taskLockbox = new TaskLockbox(taskStorage, 3000);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
metadataStorageCoordinator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.guava.FunctionalIterable;

import io.druid.server.initialization.ServerConfig;
import org.joda.time.DateTime;
import org.joda.time.Interval;

Expand All @@ -52,6 +52,7 @@
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -67,6 +68,7 @@ public class TaskLockbox
private final TaskStorage taskStorage;
private final ReentrantLock giant = new ReentrantLock(true);
private final Condition lockReleaseCondition = giant.newCondition();
protected final long lockTimeoutMillis;

private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);

Expand All @@ -76,10 +78,21 @@ public class TaskLockbox

@Inject
public TaskLockbox(
TaskStorage taskStorage
TaskStorage taskStorage,
ServerConfig serverConfig
)
{
this.taskStorage = taskStorage;
this.lockTimeoutMillis = serverConfig.getMaxIdleTime().getMillis();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serverConfig.getMaxIdleTime() corresponds to druid.server.http.maxIdleTime and is not deprecated.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, right.

}

public TaskLockbox(
TaskStorage taskStorage,
long lockTimeoutMillis
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should every lockAcquireAction have the same timeout? I think it would be valuable if we can change the timeout depending on task specs in the future. For example, a task can have a long timeout if it should acquire a lock for a long interval.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should every lockAcquireAction have the same timeout? Yes for now, it can also be extended where client can send a timeout.
For now serverConfig.getMaxIdleTime() is the default timeout b/c even if task gets a lock after this period, overlord can not write the response in the closed socket.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now serverConfig.getMaxIdleTime() is the default timeout b/c even if task gets a lock after this period, overlord can not write the response in the closed socket.

Using maxIdleTime as a default value sounds good.

Yes for now, it can also be extended where client can send a timeout.

I think we need to set different timeouts for each lock request in very near future because I'm working on prioritized locking (#4479, #1679) and this timeout feature will be great if tasks can set different timeouts according to their priorities.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll create a separate PR where client can send locktimeout, if not default serverConfig.getMaxIdleTime() will be used.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Thanks!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akashdw I raised #4533.

)
{
this.taskStorage = taskStorage;
this.lockTimeoutMillis = lockTimeoutMillis;
}

/**
Expand Down Expand Up @@ -179,15 +192,36 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
*/
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
{
long timeout = lockTimeoutMillis;
giant.lock();
try {
Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) {
lockReleaseCondition.await();
long startTime = System.currentTimeMillis();
lockReleaseCondition.await(timeout, TimeUnit.MILLISECONDS);
long timeDelta = System.currentTimeMillis() - startTime;
if (timeDelta >= timeout) {
log.error(
"Task [%s] can not acquire lock for interval [%s] within [%s] ms",
task.getId(),
interval,
lockTimeoutMillis
);

throw new InterruptedException(String.format(
"Task [%s] can not acquire lock for interval [%s] within [%s] ms",
task.getId(),
interval,
lockTimeoutMillis
));
} else {
timeout -= timeDelta;
}
}

return taskLock.get();
} finally {
}
finally {
giant.unlock();
}
}
Expand Down Expand Up @@ -247,6 +281,12 @@ private Optional<TaskLock> tryLock(final Task task, final Interval interval, fin
if (foundPosse.getTaskLock().getInterval().contains(interval) && foundPosse.getTaskLock().getGroupId().equals(task.getGroupId())) {
posseToUse = foundPosse;
} else {
//Could be a deadlock for LockAcquireAction: same task trying to acquire lock for overlapping interval
if (foundPosse.getTaskIds().contains(task.getId())) {
log.makeAlert("Same Task is trying to acquire lock for overlapping interval")
.addData("task", task.getId())
.addData("interval", interval);
}
return Optional.absent();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
Expand All @@ -37,8 +39,10 @@
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -55,6 +59,14 @@ public class SegmentAllocateActionTest
private static final DateTime PARTY_TIME = new DateTime("1999");
private static final DateTime THE_DISTANT_FUTURE = new DateTime("3000");

@Before
public void setUp()
{
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);
}

@Test
public void testGranularitiesFinerThanDay() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public TaskActionToolbox getTaskActionToolbox()
public void before()
{
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H")));
taskLockbox = new TaskLockbox(taskStorage);
taskLockbox = new TaskLockbox(taskStorage, 300);
testDerbyConnector = new TestDerbyConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(metadataStorageTablesConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ private TaskToolbox makeToolbox(
)
{
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, 300);
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public static Collection<Object[]> constructorFeeder() throws IOException
}
INDEX_MERGER.persist(index, persistDir, indexSpec);

final TaskLockbox tl = new TaskLockbox(ts);
final TaskLockbox tl = new TaskLockbox(ts, 300);
final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)
{
final private Set<DataSegment> published = Sets.newHashSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory(
Preconditions.checkNotNull(taskStorage);
Preconditions.checkNotNull(emitter);

taskLockbox = new TaskLockbox(taskStorage);
taskLockbox = new TaskLockbox(taskStorage, 300);
tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock(
SupervisorManager.class)));
File tmpDir = temporaryFolder.newFolder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,51 @@

package io.druid.indexing.overlord;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.java.util.common.ISE;
import io.druid.server.initialization.ServerConfig;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Map;

public class TaskLockboxTest
{
private TaskStorage taskStorage;

private TaskLockbox lockbox;

@Rule
public final ExpectedException exception = ExpectedException.none();

@Before
public void setUp()
{
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
lockbox = new TaskLockbox(taskStorage);
ServerConfig serverConfig = EasyMock.niceMock(ServerConfig.class);
EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100));
EasyMock.replay(serverConfig);

ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);

lockbox = new TaskLockbox(taskStorage, serverConfig);
}

@Test
Expand All @@ -57,17 +80,19 @@ public void testLockForInactiveTask() throws InterruptedException
lockbox.lock(NoopTask.create(), new Interval("2015-01-01/2015-01-02"));
}

@Test(expected = IllegalStateException.class)
@Test
public void testLockAfterTaskComplete() throws InterruptedException
{
Task task = NoopTask.create();
exception.expect(ISE.class);
exception.expectMessage("Unable to grant lock to inactive Task");
lockbox.add(task);
lockbox.remove(task);
lockbox.lock(task, new Interval("2015-01-01/2015-01-02"));
}

@Test
public void testTryLock() throws InterruptedException
public void testTryLock()
{
Task task = NoopTask.create();
lockbox.add(task);
Expand All @@ -86,7 +111,7 @@ public void testTryLock() throws InterruptedException
}

@Test
public void testTrySmallerLock() throws InterruptedException
public void testTrySmallerLock()
{
Task task = NoopTask.create();
lockbox.add(task);
Expand All @@ -109,20 +134,61 @@ public void testTrySmallerLock() throws InterruptedException
);
}


@Test(expected = IllegalStateException.class)
public void testTryLockForInactiveTask() throws InterruptedException
public void testTryLockForInactiveTask()
{
Assert.assertFalse(lockbox.tryLock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")).isPresent());
}

@Test(expected = IllegalStateException.class)
public void testTryLockAfterTaskComplete() throws InterruptedException
@Test
public void testTryLockAfterTaskComplete()
{
Task task = NoopTask.create();
exception.expect(ISE.class);
exception.expectMessage("Unable to grant lock to inactive Task");
lockbox.add(task);
lockbox.remove(task);
Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent());
}

@Test
public void testTimeoutForLock() throws InterruptedException
{
Task task1 = NoopTask.create();
Task task2 = new SomeTask(null, 0, 0, null, null, null);

lockbox.add(task1);
lockbox.add(task2);
exception.expect(InterruptedException.class);
exception.expectMessage("can not acquire lock for interval");
lockbox.lock(task1, new Interval("2015-01-01/2015-01-02"));
lockbox.lock(task2, new Interval("2015-01-01/2015-01-15"));
}

public static class SomeTask extends NoopTask {

public SomeTask(
@JsonProperty("id") String id,
@JsonProperty("runTime") long runTime,
@JsonProperty("isReadyTime") long isReadyTime,
@JsonProperty("isReadyResult") String isReadyResult,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("context") Map<String, Object> context
)
{
super(id, runTime, isReadyTime, isReadyResult, firehoseFactory, context);
}

@Override
public String getType()
{
return "someTask";
}

@Override
public String getGroupId() { return "someGroupId";}

}

}