From e88e9bfd03185d8e6cb2b5cf1149be97d936aa42 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Fri, 23 Jun 2017 15:43:56 -0700 Subject: [PATCH 1/7] Timeout for LockAcquireAction --- .../druid/indexing/overlord/TaskLockbox.java | 32 ++++++++++-- .../common/actions/TaskActionTestKit.java | 2 +- .../common/task/RealtimeIndexTaskTest.java | 2 +- .../IngestSegmentFirehoseFactoryTest.java | 2 +- .../indexing/overlord/TaskLifecycleTest.java | 2 +- .../indexing/overlord/TaskLockboxTest.java | 49 ++++++++++++++++++- 6 files changed, 81 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 0c496973873f..43bc7761cebe 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -40,7 +40,7 @@ import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.FunctionalIterable; - +import io.druid.server.initialization.ServerConfig; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -52,6 +52,7 @@ import java.util.NavigableSet; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -67,6 +68,7 @@ public class TaskLockbox private final TaskStorage taskStorage; private final ReentrantLock giant = new ReentrantLock(true); private final Condition lockReleaseCondition = giant.newCondition(); + protected final long lockTimeoutMillis; private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); @@ -76,10 +78,21 @@ public class TaskLockbox @Inject public TaskLockbox( - TaskStorage taskStorage + TaskStorage taskStorage, + ServerConfig serverConfig ) { this.taskStorage = taskStorage; + this.lockTimeoutMillis = serverConfig.getMaxIdleTime().getMillis(); + } + + public TaskLockbox( + TaskStorage taskStorage, + long lockTimeoutMillis + ) + { + this.taskStorage = taskStorage; + this.lockTimeoutMillis = lockTimeoutMillis; } /** @@ -179,11 +192,20 @@ public int compare(Pair left, Pair right) */ public TaskLock lock(final Task task, final Interval interval) throws InterruptedException { + long startTime = System.currentTimeMillis(); giant.lock(); try { Optional taskLock; while (!(taskLock = tryLock(task, interval)).isPresent()) { - lockReleaseCondition.await(); + lockReleaseCondition.await(lockTimeoutMillis, TimeUnit.MILLISECONDS); + if (System.currentTimeMillis() - startTime > lockTimeoutMillis) { + throw new InterruptedException(String.format( + "Task [%s] can not acquire lock for interval [%s] within [%s] ms", + task.getId(), + interval, + lockTimeoutMillis + )); + } } return taskLock.get(); @@ -247,6 +269,10 @@ private Optional tryLock(final Task task, final Interval interval, fin if (foundPosse.getTaskLock().getInterval().contains(interval) && foundPosse.getTaskLock().getGroupId().equals(task.getGroupId())) { posseToUse = foundPosse; } else { + //Could be a deadlock for LOCK action: same task trying to acquire lock for overlapping interval + if (foundPosse.getTaskIds().contains(task.getId())) { + log.warn("Same Task [%s] is trying to acquire lock for overlapping interval [%s]", task.getId(), interval); + } return Optional.absent(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java index 3fe74a780eb9..d53336712f1b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java @@ -80,7 +80,7 @@ public TaskActionToolbox getTaskActionToolbox() public void before() { taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H"))); - taskLockbox = new TaskLockbox(taskStorage); + taskLockbox = new TaskLockbox(taskStorage, 300000); testDerbyConnector = new TestDerbyConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance(metadataStorageTablesConfig) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 78c48b0734dd..f52542ffd772 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -951,7 +951,7 @@ private TaskToolbox makeToolbox( ) { final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); - final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); + final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, 300000); try { taskStorage.insert(task, TaskStatus.running(task.getId())); } diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index e98291228725..ec0fc02c660a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -152,7 +152,7 @@ public static Collection constructorFeeder() throws IOException } INDEX_MERGER.persist(index, persistDir, indexSpec); - final TaskLockbox tl = new TaskLockbox(ts); + final TaskLockbox tl = new TaskLockbox(ts, 300000); final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) { final private Set published = Sets.newHashSet(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index b001cfceda8c..316907701e78 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -514,7 +514,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( Preconditions.checkNotNull(taskStorage); Preconditions.checkNotNull(emitter); - taskLockbox = new TaskLockbox(taskStorage); + taskLockbox = new TaskLockbox(taskStorage, 300000); tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock( SupervisorManager.class))); File tmpDir = temporaryFolder.newFolder(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index 600b696cf3db..44078c3930f3 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -19,17 +19,24 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; +import io.druid.server.initialization.ServerConfig; +import org.easymock.EasyMock; import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.util.Map; + public class TaskLockboxTest { private TaskStorage taskStorage; @@ -40,7 +47,10 @@ public class TaskLockboxTest public void setUp() { taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); - lockbox = new TaskLockbox(taskStorage); + ServerConfig serverConfig = EasyMock.niceMock(ServerConfig.class); + EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100)); + EasyMock.replay(serverConfig); + lockbox = new TaskLockbox(taskStorage, serverConfig); } @Test @@ -124,5 +134,42 @@ public void testTryLockAfterTaskComplete() throws InterruptedException Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); } + @Test(expected = InterruptedException.class) + public void testTimeoutForLock() throws InterruptedException + { + Task task1 = NoopTask.create(); + Task task2 = new SomeTask(null, 0, 0, null, null, null); + + lockbox.add(task1); + lockbox.add(task2); + + lockbox.lock(task1, new Interval("2015-01-01/2015-01-02")); + lockbox.lock(task2, new Interval("2015-01-01/2015-01-15")); + } + + public class SomeTask extends NoopTask { + + public SomeTask( + @JsonProperty("id") String id, + @JsonProperty("runTime") long runTime, + @JsonProperty("isReadyTime") long isReadyTime, + @JsonProperty("isReadyResult") String isReadyResult, + @JsonProperty("firehose") FirehoseFactory firehoseFactory, + @JsonProperty("context") Map context + ) + { + super(id, runTime, isReadyTime, isReadyResult, firehoseFactory, context); + } + + @Override + public String getType() + { + return "someTask"; + } + + @Override + public String getGroupId() { return "someGroupId";} + + } } From 0151042ca271184c6353f80829270c111eace7f9 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Fri, 23 Jun 2017 16:29:08 -0700 Subject: [PATCH 2/7] Static inner class. --- .../test/java/io/druid/indexing/overlord/TaskLockboxTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index 44078c3930f3..7904479f9c88 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -147,7 +147,7 @@ public void testTimeoutForLock() throws InterruptedException lockbox.lock(task2, new Interval("2015-01-01/2015-01-15")); } - public class SomeTask extends NoopTask { + public static class SomeTask extends NoopTask { public SomeTask( @JsonProperty("id") String id, From d9ad40c6cf17c3fbad9e8a1c5f5f250c8b2b37bd Mon Sep 17 00:00:00 2001 From: Akash Dwivedi Date: Fri, 23 Jun 2017 16:39:20 -0700 Subject: [PATCH 3/7] Rebase changes. --- .../test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index b908af860475..96d56b4ee576 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1491,7 +1491,7 @@ private void makeToolboxFactory() throws IOException derby.metadataTablesConfigSupplier().get(), derbyConnector ); - taskLockbox = new TaskLockbox(taskStorage); + taskLockbox = new TaskLockbox(taskStorage, 300000); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, metadataStorageCoordinator, From 40930e0171c813fe1a361042bd0cbbd7e5dd47d6 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Mon, 26 Jun 2017 14:22:56 -0700 Subject: [PATCH 4/7] makeAlert and throw exception incase of overlapping interval. --- .../druid/indexing/overlord/TaskLockbox.java | 11 +++++++-- .../indexing/overlord/TaskLockboxTest.java | 24 ++++++++++++++++--- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 43bc7761cebe..2b358c1aef49 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -269,9 +269,16 @@ private Optional tryLock(final Task task, final Interval interval, fin if (foundPosse.getTaskLock().getInterval().contains(interval) && foundPosse.getTaskLock().getGroupId().equals(task.getGroupId())) { posseToUse = foundPosse; } else { - //Could be a deadlock for LOCK action: same task trying to acquire lock for overlapping interval + //Could be a deadlock for LockAcquireAction: same task trying to acquire lock for overlapping interval if (foundPosse.getTaskIds().contains(task.getId())) { - log.warn("Same Task [%s] is trying to acquire lock for overlapping interval [%s]", task.getId(), interval); + log.makeAlert("Same Task is trying to acquire lock for overlapping interval") + .addData("task", task.getId()) + .addData("interval", interval); + throw new ISE( + "Same Task [%s] is trying to acquire lock for overlapping interval [%s]", + task.getId(), + interval + ); } return Optional.absent(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index 7904479f9c88..f3fe435fe1f8 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -22,11 +22,14 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; import io.druid.server.initialization.ServerConfig; import org.easymock.EasyMock; import org.joda.time.Interval; @@ -50,6 +53,11 @@ public void setUp() ServerConfig serverConfig = EasyMock.niceMock(ServerConfig.class); EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100)); EasyMock.replay(serverConfig); + + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + EmittingLogger.registerEmitter(emitter); + EasyMock.replay(emitter); + lockbox = new TaskLockbox(taskStorage, serverConfig); } @@ -104,9 +112,6 @@ public void testTrySmallerLock() throws InterruptedException Assert.assertTrue(lock1.isPresent()); Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock1.get().getInterval()); - // same task tries to take partially overlapping interval; should fail - Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-02/2015-01-04")).isPresent()); - // same task tries to take contained interval; should succeed and should match the original lock Optional lock2 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")); Assert.assertTrue(lock2.isPresent()); @@ -119,6 +124,19 @@ public void testTrySmallerLock() throws InterruptedException ); } + @Test(expected = ISE.class) + public void testOverlappingIntervalForSameTask() throws InterruptedException + { + Task task = NoopTask.create(); + lockbox.add(task); + Optional lock1 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-03")); + Assert.assertTrue(lock1.isPresent()); + Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock1.get().getInterval()); + + // same task tries to take partially overlapping interval; should throw exception + lockbox.tryLock(task, new Interval("2015-01-02/2015-01-04")); + } + @Test(expected = IllegalStateException.class) public void testTryLockForInactiveTask() throws InterruptedException { From b7cbb8666e57774eed6cf2c87c605575f9114ce8 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Fri, 30 Jun 2017 17:23:47 -0700 Subject: [PATCH 5/7] Addressed comments. --- .../indexing/kafka/KafkaIndexTaskTest.java | 2 +- .../druid/indexing/overlord/TaskLockbox.java | 25 +++++++++------ .../actions/SegmentAllocateActionTest.java | 12 +++++++ .../common/actions/TaskActionTestKit.java | 2 +- .../indexing/overlord/TaskLockboxTest.java | 32 +++++++++---------- 5 files changed, 45 insertions(+), 28 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 96d56b4ee576..a421e9c9284d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1491,7 +1491,7 @@ private void makeToolboxFactory() throws IOException derby.metadataTablesConfigSupplier().get(), derbyConnector ); - taskLockbox = new TaskLockbox(taskStorage, 300000); + taskLockbox = new TaskLockbox(taskStorage, 3000); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, metadataStorageCoordinator, diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 2b358c1aef49..fbc3303e2cbd 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -192,24 +192,36 @@ public int compare(Pair left, Pair right) */ public TaskLock lock(final Task task, final Interval interval) throws InterruptedException { - long startTime = System.currentTimeMillis(); + long timeout = lockTimeoutMillis; giant.lock(); try { Optional taskLock; while (!(taskLock = tryLock(task, interval)).isPresent()) { - lockReleaseCondition.await(lockTimeoutMillis, TimeUnit.MILLISECONDS); - if (System.currentTimeMillis() - startTime > lockTimeoutMillis) { + long startTime = System.currentTimeMillis(); + lockReleaseCondition.await(timeout, TimeUnit.MILLISECONDS); + long timeDelta = System.currentTimeMillis() - startTime; + if (timeDelta >= timeout) { + log.error( + "Task [%s] can not acquire lock for interval [%s] within [%s] ms", + task.getId(), + interval, + lockTimeoutMillis + ); + throw new InterruptedException(String.format( "Task [%s] can not acquire lock for interval [%s] within [%s] ms", task.getId(), interval, lockTimeoutMillis )); + } else { + timeout -= timeDelta; } } return taskLock.get(); - } finally { + } + finally { giant.unlock(); } } @@ -274,11 +286,6 @@ private Optional tryLock(final Task task, final Interval interval, fin log.makeAlert("Same Task is trying to acquire lock for overlapping interval") .addData("task", task.getId()) .addData("interval", interval); - throw new ISE( - "Same Task [%s] is trying to acquire lock for overlapping interval [%s]", - task.getId(), - interval - ); } return Optional.absent(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java index 4547917a251c..a587b38e1c14 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -25,6 +25,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; @@ -37,8 +39,10 @@ import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.SingleDimensionShardSpec; +import org.easymock.EasyMock; import org.joda.time.DateTime; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -55,6 +59,14 @@ public class SegmentAllocateActionTest private static final DateTime PARTY_TIME = new DateTime("1999"); private static final DateTime THE_DISTANT_FUTURE = new DateTime("3000"); + @Before + public void setUp() + { + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + EmittingLogger.registerEmitter(emitter); + EasyMock.replay(emitter); + } + @Test public void testGranularitiesFinerThanDay() throws Exception { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java index d53336712f1b..8a3442d9dbbd 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java @@ -80,7 +80,7 @@ public TaskActionToolbox getTaskActionToolbox() public void before() { taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H"))); - taskLockbox = new TaskLockbox(taskStorage, 300000); + taskLockbox = new TaskLockbox(taskStorage, 300); testDerbyConnector = new TestDerbyConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance(metadataStorageTablesConfig) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index f3fe435fe1f8..65ed35af5b9b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -36,7 +36,9 @@ import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.Map; @@ -46,6 +48,9 @@ public class TaskLockboxTest private TaskLockbox lockbox; + @Rule + public final ExpectedException exception = ExpectedException.none(); + @Before public void setUp() { @@ -54,7 +59,7 @@ public void setUp() EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100)); EasyMock.replay(serverConfig); - ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); EmittingLogger.registerEmitter(emitter); EasyMock.replay(emitter); @@ -75,10 +80,11 @@ public void testLockForInactiveTask() throws InterruptedException lockbox.lock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")); } - @Test(expected = IllegalStateException.class) + @Test public void testLockAfterTaskComplete() throws InterruptedException { Task task = NoopTask.create(); + exception.expect(IllegalStateException.class); lockbox.add(task); lockbox.remove(task); lockbox.lock(task, new Interval("2015-01-01/2015-01-02")); @@ -112,6 +118,9 @@ public void testTrySmallerLock() throws InterruptedException Assert.assertTrue(lock1.isPresent()); Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock1.get().getInterval()); + // same task tries to take partially overlapping interval; should fail + Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-02/2015-01-04")).isPresent()); + // same task tries to take contained interval; should succeed and should match the original lock Optional lock2 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")); Assert.assertTrue(lock2.isPresent()); @@ -124,18 +133,6 @@ public void testTrySmallerLock() throws InterruptedException ); } - @Test(expected = ISE.class) - public void testOverlappingIntervalForSameTask() throws InterruptedException - { - Task task = NoopTask.create(); - lockbox.add(task); - Optional lock1 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-03")); - Assert.assertTrue(lock1.isPresent()); - Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock1.get().getInterval()); - - // same task tries to take partially overlapping interval; should throw exception - lockbox.tryLock(task, new Interval("2015-01-02/2015-01-04")); - } @Test(expected = IllegalStateException.class) public void testTryLockForInactiveTask() throws InterruptedException @@ -143,16 +140,17 @@ public void testTryLockForInactiveTask() throws InterruptedException Assert.assertFalse(lockbox.tryLock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")).isPresent()); } - @Test(expected = IllegalStateException.class) + @Test public void testTryLockAfterTaskComplete() throws InterruptedException { Task task = NoopTask.create(); + exception.expect(IllegalStateException.class); lockbox.add(task); lockbox.remove(task); Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); } - @Test(expected = InterruptedException.class) + @Test public void testTimeoutForLock() throws InterruptedException { Task task1 = NoopTask.create(); @@ -160,7 +158,7 @@ public void testTimeoutForLock() throws InterruptedException lockbox.add(task1); lockbox.add(task2); - + exception.expect(InterruptedException.class); lockbox.lock(task1, new Interval("2015-01-01/2015-01-02")); lockbox.lock(task2, new Interval("2015-01-01/2015-01-15")); } From fb6dec4c97e9aef16ff327e60f518f5445af5db1 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Fri, 30 Jun 2017 17:37:40 -0700 Subject: [PATCH 6/7] remove unused import. --- .../test/java/io/druid/indexing/overlord/TaskLockboxTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index 65ed35af5b9b..da0ab86b4cc4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -29,7 +29,6 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; -import io.druid.java.util.common.ISE; import io.druid.server.initialization.ServerConfig; import org.easymock.EasyMock; import org.joda.time.Interval; From 751ced24ab40f7501b09cc213b0e555ffacf9ef3 Mon Sep 17 00:00:00 2001 From: dwivedi Date: Wed, 5 Jul 2017 11:16:58 -0700 Subject: [PATCH 7/7] Addressed comments --- .../common/task/RealtimeIndexTaskTest.java | 2 +- .../IngestSegmentFirehoseFactoryTest.java | 2 +- .../indexing/overlord/TaskLifecycleTest.java | 2 +- .../druid/indexing/overlord/TaskLockboxTest.java | 16 ++++++++++------ 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index f52542ffd772..723b52b8e04b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -951,7 +951,7 @@ private TaskToolbox makeToolbox( ) { final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); - final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, 300000); + final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, 300); try { taskStorage.insert(task, TaskStatus.running(task.getId())); } diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index ec0fc02c660a..2c4c07ff1077 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -152,7 +152,7 @@ public static Collection constructorFeeder() throws IOException } INDEX_MERGER.persist(index, persistDir, indexSpec); - final TaskLockbox tl = new TaskLockbox(ts, 300000); + final TaskLockbox tl = new TaskLockbox(ts, 300); final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) { final private Set published = Sets.newHashSet(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 316907701e78..428f413c8c55 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -514,7 +514,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( Preconditions.checkNotNull(taskStorage); Preconditions.checkNotNull(emitter); - taskLockbox = new TaskLockbox(taskStorage, 300000); + taskLockbox = new TaskLockbox(taskStorage, 300); tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock( SupervisorManager.class))); File tmpDir = temporaryFolder.newFolder(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index da0ab86b4cc4..c9b4245f91c1 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -29,6 +29,7 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; +import io.druid.java.util.common.ISE; import io.druid.server.initialization.ServerConfig; import org.easymock.EasyMock; import org.joda.time.Interval; @@ -83,14 +84,15 @@ public void testLockForInactiveTask() throws InterruptedException public void testLockAfterTaskComplete() throws InterruptedException { Task task = NoopTask.create(); - exception.expect(IllegalStateException.class); + exception.expect(ISE.class); + exception.expectMessage("Unable to grant lock to inactive Task"); lockbox.add(task); lockbox.remove(task); lockbox.lock(task, new Interval("2015-01-01/2015-01-02")); } @Test - public void testTryLock() throws InterruptedException + public void testTryLock() { Task task = NoopTask.create(); lockbox.add(task); @@ -109,7 +111,7 @@ public void testTryLock() throws InterruptedException } @Test - public void testTrySmallerLock() throws InterruptedException + public void testTrySmallerLock() { Task task = NoopTask.create(); lockbox.add(task); @@ -134,16 +136,17 @@ public void testTrySmallerLock() throws InterruptedException @Test(expected = IllegalStateException.class) - public void testTryLockForInactiveTask() throws InterruptedException + public void testTryLockForInactiveTask() { Assert.assertFalse(lockbox.tryLock(NoopTask.create(), new Interval("2015-01-01/2015-01-02")).isPresent()); } @Test - public void testTryLockAfterTaskComplete() throws InterruptedException + public void testTryLockAfterTaskComplete() { Task task = NoopTask.create(); - exception.expect(IllegalStateException.class); + exception.expect(ISE.class); + exception.expectMessage("Unable to grant lock to inactive Task"); lockbox.add(task); lockbox.remove(task); Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); @@ -158,6 +161,7 @@ public void testTimeoutForLock() throws InterruptedException lockbox.add(task1); lockbox.add(task2); exception.expect(InterruptedException.class); + exception.expectMessage("can not acquire lock for interval"); lockbox.lock(task1, new Interval("2015-01-01/2015-01-02")); lockbox.lock(task2, new Interval("2015-01-01/2015-01-15")); }