From d006942d914e6d56d6b1dedb2691c8584d741866 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 15 Jul 2017 12:33:45 +0900 Subject: [PATCH 1/5] Passing lockTimeout as a parameter for TaskLockbox.lock() --- .../input/orc/OrcIndexGeneratorJobTest.java | 1 + .../indexing/kafka/KafkaIndexTaskTest.java | 2 +- .../io/druid/indexer/HadoopTuningConfig.java | 37 ++++++--- .../indexer/BatchDeltaIngestionTest.java | 1 + .../DetermineHashedPartitionsJobTest.java | 1 + .../indexer/DeterminePartitionsJobTest.java | 1 + .../indexer/HadoopDruidIndexerConfigTest.java | 2 + .../druid/indexer/HadoopTuningConfigTest.java | 1 + .../druid/indexer/IndexGeneratorJobTest.java | 1 + .../java/io/druid/indexer/JobHelperTest.java | 1 + .../indexer/path/GranularityPathSpecTest.java | 1 + .../updater/HadoopConverterJobTest.java | 1 + .../common/actions/LockAcquireAction.java | 20 ++++- .../indexing/common/task/HadoopIndexTask.java | 3 +- .../druid/indexing/common/task/IndexTask.java | 20 ++++- .../common/task/RealtimeIndexTask.java | 7 +- .../druid/indexing/overlord/TaskLockbox.java | 79 +++++++++---------- .../actions/SegmentInsertActionTest.java | 4 +- .../SegmentTransactionalInsertActionTest.java | 6 +- .../common/actions/TaskActionTestKit.java | 2 +- .../indexing/common/task/IndexTaskTest.java | 1 + .../common/task/RealtimeIndexTaskTest.java | 3 +- .../indexing/common/task/TaskSerdeTest.java | 16 ++-- .../IngestSegmentFirehoseFactoryTest.java | 2 +- .../indexing/overlord/RealtimeishTask.java | 4 +- .../indexing/overlord/TaskLifecycleTest.java | 26 +++--- .../indexing/overlord/TaskLockboxTest.java | 49 ++---------- .../indexing/RealtimeTuningConfig.java | 21 ++++- .../segment/realtime/RealtimeManagerTest.java | 3 + .../appenderator/AppenderatorPlumberTest.java | 1 + .../appenderator/AppenderatorTester.java | 1 + ...DefaultOfflineAppenderatorFactoryTest.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 1 + .../segment/realtime/plumber/SinkTest.java | 1 + .../cli/validate/DruidJsonValidatorTest.java | 1 + 35 files changed, 186 insertions(+), 136 deletions(-) diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java index ffb058eac76e..946dcfc290cf 100644 --- a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java @@ -228,6 +228,7 @@ public void setUp() throws Exception null, false, false, + null, null ) ) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 16a6963d8cfe..19189eac3a68 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1478,7 +1478,7 @@ private void makeToolboxFactory() throws IOException derby.metadataTablesConfigSupplier().get(), derbyConnector ); - taskLockbox = new TaskLockbox(taskStorage, 3000); + taskLockbox = new TaskLockbox(taskStorage); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, metadataStorageCoordinator, diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index de5c9156ed39..3f234215a550 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -30,6 +30,7 @@ import io.druid.segment.IndexSpec; import io.druid.segment.indexing.TuningConfig; import org.joda.time.DateTime; +import org.joda.time.Period; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ public class HadoopTuningConfig implements TuningConfig private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000; private static final boolean DEFAULT_USE_COMBINER = false; private static final int DEFAULT_NUM_BACKGROUND_PERSIST_THREADS = 0; + private static final Period DEFAULT_LOCK_TIMEOUT = new Period("PT5m"); public static HadoopTuningConfig makeDefaultTuningConfig() { @@ -67,7 +69,8 @@ public static HadoopTuningConfig makeDefaultTuningConfig() DEFAULT_NUM_BACKGROUND_PERSIST_THREADS, false, false, - null + null, + DEFAULT_LOCK_TIMEOUT ); } @@ -88,6 +91,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final boolean forceExtendableShardSpecs; private final boolean useExplicitVersion; private final List allowedHadoopPrefix; + private final Period lockTimeout; @JsonCreator public HadoopTuningConfig( @@ -111,7 +115,8 @@ public HadoopTuningConfig( final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads, final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs, final @JsonProperty("useExplicitVersion") boolean useExplicitVersion, - final @JsonProperty("allowedHadoopPrefix") List allowedHadoopPrefix + final @JsonProperty("allowedHadoopPrefix") List allowedHadoopPrefix, + final @JsonProperty("lockTimeout") Period lockTimeout ) { this.workingPath = workingPath; @@ -140,6 +145,7 @@ public HadoopTuningConfig( this.allowedHadoopPrefix = allowedHadoopPrefix == null ? ImmutableList.of("druid.storage.", "druid.javascript.") : allowedHadoopPrefix; + this.lockTimeout = lockTimeout == null ? DEFAULT_LOCK_TIMEOUT : lockTimeout; } @JsonProperty @@ -248,6 +254,18 @@ public boolean isUseExplicitVersion() return useExplicitVersion; } + @JsonProperty + public List getAllowedHadoopPrefix() + { + return allowedHadoopPrefix; + } + + @JsonProperty + public Period getLockTimeout() + { + return lockTimeout; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -269,7 +287,8 @@ public HadoopTuningConfig withWorkingPath(String path) numBackgroundPersistThreads, forceExtendableShardSpecs, useExplicitVersion, - allowedHadoopPrefix + allowedHadoopPrefix, + lockTimeout ); } @@ -294,7 +313,8 @@ public HadoopTuningConfig withVersion(String ver) numBackgroundPersistThreads, forceExtendableShardSpecs, useExplicitVersion, - allowedHadoopPrefix + allowedHadoopPrefix, + lockTimeout ); } @@ -319,13 +339,8 @@ public HadoopTuningConfig withShardSpecs(Map> specs numBackgroundPersistThreads, forceExtendableShardSpecs, useExplicitVersion, - allowedHadoopPrefix + allowedHadoopPrefix, + lockTimeout ); } - - @JsonProperty - public List getAllowedHadoopPrefix() - { - return allowedHadoopPrefix; - } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index bfee0b376735..44dd5df4c24c 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -388,6 +388,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map @JsonIgnore private final Interval interval; + @JsonIgnore + private final long timeoutMs; + @JsonCreator public LockAcquireAction( - @JsonProperty("interval") Interval interval + @JsonProperty("interval") Interval interval, + @JsonProperty("timeoutMs") long timeoutMs ) { this.interval = interval; + this.timeoutMs = timeoutMs; } @JsonProperty @@ -47,6 +52,12 @@ public Interval getInterval() return interval; } + @JsonProperty + public long getTimeoutMs() + { + return timeoutMs; + } + @Override public TypeReference getReturnTypeReference() { @@ -59,7 +70,11 @@ public TypeReference getReturnTypeReference() public TaskLock perform(Task task, TaskActionToolbox toolbox) { try { - return toolbox.getTaskLockbox().lock(task, interval); + if (timeoutMs == 0) { + return toolbox.getTaskLockbox().lock(task, interval); + } else { + return toolbox.getTaskLockbox().lock(task, interval, timeoutMs); + } } catch (InterruptedException e) { throw Throwables.propagate(e); @@ -77,6 +92,7 @@ public String toString() { return "LockAcquireAction{" + "interval=" + interval + + "timeoutMs=" + timeoutMs + '}'; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 5f4833ee5392..db74e7782e2f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -198,7 +198,8 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get() ) ); - TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval)); + final long lockTimeoutMs = spec.getTuningConfig().getLockTimeout().getMillis(); + TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs)); version = lock.getVersion(); } else { Iterable locks = getTaskLocks(toolbox); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index a091d4b84b2b..42bfdb048689 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -193,7 +193,8 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception final DataSchema dataSchema; if (determineIntervals) { Interval interval = JodaUtils.umbrellaInterval(shardSpecs.getIntervals()); - TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval)); + final long lockTimeoutMs = ingestionSchema.getTuningConfig().getLockTimeout().getMillis(); + TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs)); version = lock.getVersion(); dataSchema = ingestionSchema.getDataSchema().withGranularitySpec( ingestionSchema.getDataSchema() @@ -898,6 +899,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private static final boolean DEFAULT_GUARANTEE_ROLLUP = false; private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; private static final long DEFAULT_PUBLISH_TIMEOUT = 0; + private static final Period DEFAULT_LOCK_TIMEOUT = new Period("PT5m"); static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; @@ -912,6 +914,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final boolean forceGuaranteedRollup; private final boolean reportParseExceptions; private final long publishTimeout; + private final Period lockTimeout; @JsonCreator public IndexTuningConfig( @@ -927,7 +930,8 @@ public IndexTuningConfig( @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs, @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup, @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, - @JsonProperty("publishTimeout") @Nullable Long publishTimeout + @JsonProperty("publishTimeout") @Nullable Long publishTimeout, + @JsonProperty("lockTimeout") @Nullable Period lockTimeout ) { this( @@ -941,13 +945,14 @@ public IndexTuningConfig( forceGuaranteedRollup, reportParseExceptions, publishTimeout, + lockTimeout, null ); } private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @@ -961,6 +966,7 @@ private IndexTuningConfig( @Nullable Boolean forceGuaranteedRollup, @Nullable Boolean reportParseExceptions, @Nullable Long publishTimeout, + @Nullable Period lockTimeout, @Nullable File basePersistDirectory ) { @@ -989,6 +995,7 @@ private IndexTuningConfig( ? DEFAULT_REPORT_PARSE_EXCEPTIONS : reportParseExceptions; this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; + this.lockTimeout = lockTimeout == null ? DEFAULT_LOCK_TIMEOUT : lockTimeout; this.basePersistDirectory = basePersistDirectory; Preconditions.checkArgument( @@ -1010,6 +1017,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir) forceGuaranteedRollup, reportParseExceptions, publishTimeout, + lockTimeout, dir ); } @@ -1094,6 +1102,12 @@ public long getPublishTimeout() return publishTimeout; } + @JsonProperty + public Period getLockTimeout() + { + return lockTimeout; + } + @Override public Period getIntermediatePersistPeriod() { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index f55b7f2471fd..fc4697401aad 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -207,13 +207,14 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception // which will typically be either the main data processing loop or the persist thread. // Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments + final long lockTimeoutMs = spec.getTuningConfig().getLockTimeout().getMillis(); final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer() { @Override public void announceSegment(final DataSegment segment) throws IOException { // Side effect: Calling announceSegment causes a lock to be acquired - toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval())); + toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs)); toolbox.getSegmentAnnouncer().announceSegment(segment); } @@ -233,7 +234,7 @@ public void announceSegments(Iterable segments) throws IOException { // Side effect: Calling announceSegments causes locks to be acquired for (DataSegment segment : segments) { - toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval())); + toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs)); } toolbox.getSegmentAnnouncer().announceSegments(segments); } @@ -266,7 +267,7 @@ public String getVersion(final Interval interval) try { // Side effect: Calling getVersion causes a lock to be acquired final TaskLock myLock = toolbox.getTaskActionClient() - .submit(new LockAcquireAction(interval)); + .submit(new LockAcquireAction(interval, lockTimeoutMs)); return myLock.getVersion(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 7d00532d1f72..54b3c3879a61 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -41,7 +41,6 @@ import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.FunctionalIterable; -import io.druid.server.initialization.ServerConfig; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -64,12 +63,13 @@ */ public class TaskLockbox { + private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS; + // Datasource -> Interval -> Tasks + TaskLock private final Map> running = Maps.newHashMap(); private final TaskStorage taskStorage; private final ReentrantLock giant = new ReentrantLock(true); private final Condition lockReleaseCondition = giant.newCondition(); - protected final long lockTimeoutMillis; private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); @@ -79,21 +79,10 @@ public class TaskLockbox @Inject public TaskLockbox( - TaskStorage taskStorage, - ServerConfig serverConfig - ) - { - this.taskStorage = taskStorage; - this.lockTimeoutMillis = serverConfig.getMaxIdleTime().getMillis(); - } - - public TaskLockbox( - TaskStorage taskStorage, - long lockTimeoutMillis + TaskStorage taskStorage ) { this.taskStorage = taskStorage; - this.lockTimeoutMillis = lockTimeoutMillis; } /** @@ -140,7 +129,7 @@ public int compare(Pair left, Pair right) continue; } - final TaskLockPosse taskLockPosse = tryAddTaskToLockPosse( + final TaskLockPosse taskLockPosse = createOrFindLockPosse( task, savedTaskLock.getInterval(), Optional.of(savedTaskLock.getVersion()) @@ -189,44 +178,52 @@ public int compare(Pair left, Pair right) } /** - * Acquires a lock on behalf of a task. Blocks until the lock is acquired. Throws an exception if the lock - * cannot be acquired. + * Acquires a lock on behalf of a task. Blocks until the lock is acquired. * * @param task task to acquire lock for * @param interval interval to lock * @return acquired TaskLock * - * @throws java.lang.InterruptedException if the lock cannot be acquired + * @throws java.lang.InterruptedException if the current thread is interrupted */ public TaskLock lock(final Task task, final Interval interval) throws InterruptedException { - long timeout = lockTimeoutMillis; - giant.lock(); + giant.lockInterruptibly(); try { Optional taskLock; while (!(taskLock = tryLock(task, interval)).isPresent()) { - long startTime = System.currentTimeMillis(); - lockReleaseCondition.await(timeout, TimeUnit.MILLISECONDS); - long timeDelta = System.currentTimeMillis() - startTime; - if (timeDelta >= timeout) { - log.error( - "Task [%s] can not acquire lock for interval [%s] within [%s] ms", - task.getId(), - interval, - lockTimeoutMillis - ); + lockReleaseCondition.await(); + } + return taskLock.get(); + } + finally { + giant.unlock(); + } + } - throw new InterruptedException(String.format( - "Task [%s] can not acquire lock for interval [%s] within [%s] ms", - task.getId(), - interval, - lockTimeoutMillis - )); - } else { - timeout -= timeDelta; + /** + * Acquires a lock on behalf of a task, waiting up to the specified wait time if necessary. + * + * @param task task to acquire a lock for + * @param interval interval to lock + * @param timeoutMs maximum time to wait + * + * @return acquired lock + * + * @throws InterruptedException if the current thread is interrupted + */ + public TaskLock lock(final Task task, final Interval interval, long timeoutMs) throws InterruptedException + { + long nanos = TIME_UNIT.toNanos(timeoutMs); + giant.lockInterruptibly(); + try { + Optional taskLock; + while (!(taskLock = tryLock(task, interval)).isPresent()) { + if (nanos <= 0) { + return null; } + nanos = lockReleaseCondition.awaitNanos(nanos); } - return taskLock.get(); } finally { @@ -273,7 +270,7 @@ private Optional tryLock(final Task task, final Interval interval, fin } Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty"); - final TaskLockPosse posseToUse = tryAddTaskToLockPosse(task, interval, preferredVersion); + final TaskLockPosse posseToUse = createOrFindLockPosse(task, interval, preferredVersion); if (posseToUse != null) { // Add to existing TaskLockPosse, if necessary if (posseToUse.getTaskIds().add(task.getId())) { @@ -308,7 +305,7 @@ private Optional tryLock(final Task task, final Interval interval, fin } - private TaskLockPosse tryAddTaskToLockPosse( + private TaskLockPosse createOrFindLockPosse( final Task task, final Interval interval, final Optional preferredVersion diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java index f189356d0dfd..5cac6ce6a65e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java @@ -90,7 +90,7 @@ public void testSimple() throws Exception final Task task = new NoopTask(null, 0, 0, null, null, null); final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2)); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); action.perform(task, actionTestKit.getTaskActionToolbox()); Assert.assertEquals( @@ -108,7 +108,7 @@ public void testFailBadVersion() throws Exception final Task task = new NoopTask(null, 0, 0, null, null, null); final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3)); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); thrown.expect(IllegalStateException.class); thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task")); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index c52daae392b4..c88d1db02ffe 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -89,7 +89,7 @@ public void testTransactional() throws Exception { final Task task = new NoopTask(null, 0, 0, null, null, null); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); SegmentPublishResult result1 = new SegmentTransactionalInsertAction( ImmutableSet.of(SEGMENT1), @@ -130,7 +130,7 @@ public void testFailTransactional() throws Exception { final Task task = new NoopTask(null, 0, 0, null, null, null); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); SegmentPublishResult result = new SegmentTransactionalInsertAction( ImmutableSet.of(SEGMENT1), @@ -150,7 +150,7 @@ public void testFailBadVersion() throws Exception final Task task = new NoopTask(null, 0, 0, null, null, null); final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(ImmutableSet.of(SEGMENT3)); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); thrown.expect(IllegalStateException.class); thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task")); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java index 8a3442d9dbbd..3fe74a780eb9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java @@ -80,7 +80,7 @@ public TaskActionToolbox getTaskActionToolbox() public void before() { taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H"))); - taskLockbox = new TaskLockbox(taskStorage, 300); + taskLockbox = new TaskLockbox(taskStorage); testDerbyConnector = new TestDerbyConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance(metadataStorageTablesConfig) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 0ce369cd1680..a94a3a7add03 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -1051,6 +1051,7 @@ private static IndexTuningConfig createTuningConfig( forceExtendableShardSpecs, forceGuaranteedRollup, reportParseException, + null, null ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index de9d213a9fe3..b3aeccdf7fad 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -894,6 +894,7 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa 0, reportParseExceptions, handoffTimeout, + null, null ); return new RealtimeIndexTask( @@ -933,7 +934,7 @@ private TaskToolbox makeToolbox( ) { final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); - final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, 300); + final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); try { taskStorage.insert(task, TaskStatus.running(task.getId())); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 7b6b473b7e6c..ca6f5df17afb 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -30,6 +30,9 @@ import io.druid.indexer.HadoopIOConfig; import io.druid.indexer.HadoopIngestionSpec; import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; +import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -172,7 +175,7 @@ public void testIndexTaskSerde() throws Exception final IndexTask task = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -184,8 +187,8 @@ public void testIndexTaskSerde() throws Exception ), jsonMapper ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null) + new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), + new IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null, null) ), null, jsonMapper @@ -235,7 +238,7 @@ public void testIndexTaskwithResourceSerde() throws Exception final IndexTask task = new IndexTask( null, new TaskResource("rofl", 2), - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -247,8 +250,8 @@ public void testIndexTaskwithResourceSerde() throws Exception ), jsonMapper ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) + new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null) ), null, jsonMapper @@ -499,6 +502,7 @@ public Plumber findPlumber( 0, true, null, + null, null ) ), diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 1c8c371dbcf5..067ec8b4cd41 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -151,7 +151,7 @@ public static Collection constructorFeeder() throws IOException } INDEX_MERGER_V9.persist(index, persistDir, indexSpec); - final TaskLockbox tl = new TaskLockbox(ts, 300); + final TaskLockbox tl = new TaskLockbox(ts); final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) { final private Set published = Sets.newHashSet(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java index 3761ad051101..a168460769d4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java @@ -72,7 +72,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception // Sort of similar to what realtime tasks do: // Acquire lock for first interval - final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1)); + final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1, 5000)); final List locks1 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) @@ -80,7 +80,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception Assert.assertEquals("locks1", ImmutableList.of(lock1), locks1); // Acquire lock for second interval - final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2)); + final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2, 5000)); final List locks2 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 6c8138838090..abe25b97d5f6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -60,6 +60,9 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.IndexTask; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; +import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.Task; @@ -514,7 +517,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( Preconditions.checkNotNull(taskStorage); Preconditions.checkNotNull(emitter); - taskLockbox = new TaskLockbox(taskStorage, 300); + taskLockbox = new TaskLockbox(taskStorage); tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock( SupervisorManager.class))); File tmpDir = temporaryFolder.newFolder(); @@ -642,7 +645,7 @@ public void testIndexTask() throws Exception final Task indexTask = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -654,8 +657,8 @@ public void testIndexTask() throws Exception ), mapper ), - new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) + new IndexIOConfig(new MockFirehoseFactory(false), false), + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null) ), null, MAPPER @@ -700,7 +703,7 @@ public void testIndexTaskFailure() throws Exception final Task indexTask = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -712,8 +715,8 @@ public void testIndexTaskFailure() throws Exception ), mapper ), - new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) + new IndexIOConfig(new MockExceptionalFirehoseFactory(), false), + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null) ), null, MAPPER @@ -1065,7 +1068,7 @@ public void testResumeTasks() throws Exception final Task indexTask = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -1077,8 +1080,8 @@ public void testResumeTasks() throws Exception ), mapper ), - new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null) + new IndexIOConfig(new MockFirehoseFactory(false), false), + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null, null) ), null, MAPPER @@ -1200,7 +1203,8 @@ private RealtimeIndexTask newRealtimeIndexTask() 0, null, null, - null + null, + new Period("PT5s") ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); return new RealtimeIndexTask( diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index e10a6250b13d..1a2418f5feb0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -19,13 +19,11 @@ package io.druid.indexing.overlord; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.config.TaskStorageConfig; @@ -37,10 +35,8 @@ import io.druid.metadata.EntryExistsException; import io.druid.metadata.SQLMetadataStorageActionHandlerFactory; import io.druid.metadata.TestDerbyConnector; -import io.druid.server.initialization.ServerConfig; import org.easymock.EasyMock; import org.joda.time.Interval; -import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -48,7 +44,6 @@ import org.junit.rules.ExpectedException; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; public class TaskLockboxTest @@ -57,7 +52,6 @@ public class TaskLockboxTest public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule(); private final ObjectMapper objectMapper = new DefaultObjectMapper(); - private ServerConfig serverConfig; private TaskStorage taskStorage; private TaskLockbox lockbox; @@ -78,15 +72,11 @@ public void setup() objectMapper ) ); - serverConfig = EasyMock.niceMock(ServerConfig.class); - EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100)).anyTimes(); - EasyMock.replay(serverConfig); - ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); EmittingLogger.registerEmitter(emitter); EasyMock.replay(emitter); - lockbox = new TaskLockbox(taskStorage, serverConfig); + lockbox = new TaskLockbox(taskStorage); } @Test @@ -179,20 +169,18 @@ public void testTryLockAfterTaskComplete() public void testTimeoutForLock() throws InterruptedException { Task task1 = NoopTask.create(); - Task task2 = new SomeTask(null, 0, 0, null, null, null); + Task task2 = NoopTask.create(); lockbox.add(task1); lockbox.add(task2); - exception.expect(InterruptedException.class); - exception.expectMessage("can not acquire lock for interval"); - lockbox.lock(task1, new Interval("2015-01-01/2015-01-02")); - lockbox.lock(task2, new Interval("2015-01-01/2015-01-15")); + Assert.assertNotNull(lockbox.lock(task1, new Interval("2015-01-01/2015-01-02"), 5000)); + Assert.assertNull(lockbox.lock(task2, new Interval("2015-01-01/2015-01-15"), 1000)); } @Test public void testSyncFromStorage() throws EntryExistsException { - final TaskLockbox originalBox = new TaskLockbox(taskStorage, serverConfig); + final TaskLockbox originalBox = new TaskLockbox(taskStorage); for (int i = 0; i < 5; i++) { final Task task = NoopTask.create(); taskStorage.insert(task, TaskStatus.running(task.getId())); @@ -207,7 +195,7 @@ public void testSyncFromStorage() throws EntryExistsException .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) .collect(Collectors.toList()); - final TaskLockbox newBox = new TaskLockbox(taskStorage, serverConfig); + final TaskLockbox newBox = new TaskLockbox(taskStorage); newBox.syncFromStorage(); Assert.assertEquals(originalBox.getAllLocks(), newBox.getAllLocks()); @@ -219,29 +207,4 @@ public void testSyncFromStorage() throws EntryExistsException Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); } - - public static class SomeTask extends NoopTask { - - public SomeTask( - @JsonProperty("id") String id, - @JsonProperty("runTime") long runTime, - @JsonProperty("isReadyTime") long isReadyTime, - @JsonProperty("isReadyResult") String isReadyResult, - @JsonProperty("firehose") FirehoseFactory firehoseFactory, - @JsonProperty("context") Map context - ) - { - super(id, runTime, isReadyTime, isReadyResult, firehoseFactory, context); - } - - @Override - public String getType() - { - return "someTask"; - } - - @Override - public String getGroupId() { return "someGroupId";} - - } } diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 5338055a4d8b..ba77fa42e0be 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -50,6 +50,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig private static final Boolean defaultReportParseExceptions = Boolean.FALSE; private static final long defaultHandoffConditionTimeout = 0; private static final long defaultAlertTimeout = 0; + private static final Period defaultLockTimeoutMs = new Period("PT5m"); private static File createNewBasePersistDirectory() { @@ -74,7 +75,8 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis 0, defaultReportParseExceptions, defaultHandoffConditionTimeout, - defaultAlertTimeout + defaultAlertTimeout, + defaultLockTimeoutMs ); } @@ -92,6 +94,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis private final boolean reportParseExceptions; private final long handoffConditionTimeout; private final long alertTimeout; + private final Period lockTimeout; @JsonCreator public RealtimeTuningConfig( @@ -110,7 +113,8 @@ public RealtimeTuningConfig( @JsonProperty("mergeThreadPriority") int mergeThreadPriority, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, - @JsonProperty("alertTimeout") Long alertTimeout + @JsonProperty("alertTimeout") Long alertTimeout, + @JsonProperty("lockTimeout") Period lockTimeout ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -138,6 +142,7 @@ public RealtimeTuningConfig( this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout; Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0"); + this.lockTimeout = lockTimeout == null ? defaultLockTimeoutMs : lockTimeout; } @Override @@ -240,6 +245,12 @@ public long getAlertTimeout() return alertTimeout; } + @JsonProperty + public Period getLockTimeout() + { + return lockTimeout; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -257,7 +268,8 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) mergeThreadPriority, reportParseExceptions, handoffConditionTimeout, - alertTimeout + alertTimeout, + lockTimeout ); } @@ -278,7 +290,8 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) mergeThreadPriority, reportParseExceptions, handoffConditionTimeout, - alertTimeout + alertTimeout, + lockTimeout ); } } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 85e60f3ba616..5f7fe7da283f 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -209,6 +209,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException 0, null, null, + null, null ); plumber = new TestPlumber(new Sink( @@ -267,6 +268,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException 0, null, null, + null, null ); @@ -285,6 +287,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException 0, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 316e86b6a199..20b74fd06503 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -81,6 +81,7 @@ EasyMock. anyObject(), 0, false, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index a18b97851ccd..6048eac644c5 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -148,6 +148,7 @@ public AppenderatorTester( 0, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index a286515a3f90..ba8f534bef8c 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -145,6 +145,7 @@ public int columnCacheSizeBytes() 0, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 2c50bcddb406..53a0e2024e8c 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -199,6 +199,7 @@ public void setUp() throws Exception 0, false, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index d6bbd1ab587a..330f5303db17 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -71,6 +71,7 @@ public void testSwap() throws Exception 0, null, null, + null, null ); final Sink sink = new Sink( diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index b83a93bdbe9d..4ea9acde55f2 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -181,6 +181,7 @@ public Plumber findPlumber( 0, true, null, + null, null ) ), From ada52a2767ec8b0ee32320f16fa8abab750769aa Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 18 Jul 2017 08:38:56 +0900 Subject: [PATCH 2/5] Remove TIME_UNIT --- .../src/main/java/io/druid/indexing/overlord/TaskLockbox.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index a366cfe3cfff..d23a1a5c9f32 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -63,8 +63,6 @@ */ public class TaskLockbox { - private static final TimeUnit TIME_UNIT = TimeUnit.MILLISECONDS; - // Datasource -> Interval -> Tasks + TaskLock private final Map> running = Maps.newHashMap(); private final TaskStorage taskStorage; @@ -214,7 +212,7 @@ public TaskLock lock(final Task task, final Interval interval) throws Interrupte */ public TaskLock lock(final Task task, final Interval interval, long timeoutMs) throws InterruptedException { - long nanos = TIME_UNIT.toNanos(timeoutMs); + long nanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs); giant.lockInterruptibly(); try { Optional taskLock; From 696d9a3755b4a78abc0e52fb7ea059b6a78c2169 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 18 Jul 2017 09:51:43 +0900 Subject: [PATCH 3/5] Fix tc fail --- processing/src/main/java/io/druid/guice/ModulesConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/io/druid/guice/ModulesConfig.java b/processing/src/main/java/io/druid/guice/ModulesConfig.java index 5dbacfb85612..ce044f766fe5 100644 --- a/processing/src/main/java/io/druid/guice/ModulesConfig.java +++ b/processing/src/main/java/io/druid/guice/ModulesConfig.java @@ -28,7 +28,7 @@ public class ModulesConfig { /** * Canonical class names of modules, which should not be loaded despite they are founded in extensions from {@link - * io.druid.guice.ExtensionsConfig#loadList} or the standard list of modules loaded by some node type, e. g. {@code + * ExtensionsConfig#loadList} or the standard list of modules loaded by some node type, e. g. {@code * CliPeon}. */ @JsonProperty From 8b85f39fc215cf7f9fb0169bd268bf8940edd59d Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 19 Jul 2017 09:45:05 +0900 Subject: [PATCH 4/5] Add taskLockTimeout to TaskContext --- docs/content/ingestion/tasks.md | 11 +++++++- .../input/orc/OrcIndexGeneratorJobTest.java | 1 - .../io/druid/indexer/HadoopTuningConfig.java | 25 ++++-------------- .../indexer/BatchDeltaIngestionTest.java | 1 - .../DetermineHashedPartitionsJobTest.java | 1 - .../indexer/DeterminePartitionsJobTest.java | 1 - .../indexer/HadoopDruidIndexerConfigTest.java | 2 -- .../druid/indexer/HadoopTuningConfigTest.java | 1 - .../druid/indexer/IndexGeneratorJobTest.java | 1 - .../java/io/druid/indexer/JobHelperTest.java | 1 - .../indexer/path/GranularityPathSpecTest.java | 1 - .../updater/HadoopConverterJobTest.java | 1 - .../indexing/common/task/HadoopIndexTask.java | 3 +-- .../druid/indexing/common/task/IndexTask.java | 20 +++----------- .../common/task/RealtimeIndexTask.java | 2 +- .../io/druid/indexing/common/task/Task.java | 14 ++++++++-- .../io/druid/indexing/common/task/Tasks.java | 26 +++++++++++++++++++ .../indexing/common/task/IndexTaskTest.java | 1 - .../common/task/RealtimeIndexTaskTest.java | 1 - .../indexing/common/task/TaskSerdeTest.java | 5 ++-- .../indexing/overlord/TaskLifecycleTest.java | 9 +++---- .../indexing/RealtimeTuningConfig.java | 21 +++------------ .../segment/realtime/RealtimeManagerTest.java | 3 --- .../appenderator/AppenderatorPlumberTest.java | 2 -- .../appenderator/AppenderatorTester.java | 1 - ...DefaultOfflineAppenderatorFactoryTest.java | 1 - .../plumber/RealtimePlumberSchoolTest.java | 1 - .../segment/realtime/plumber/SinkTest.java | 1 - .../cli/validate/DruidJsonValidatorTest.java | 1 - 29 files changed, 68 insertions(+), 91 deletions(-) create mode 100644 indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 76f1268dd4f2..5a77afb616e0 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -89,7 +89,8 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed |--------|-----------|---------| |type|The task type, this should always be "index".|yes| |id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no| -|spec|The ingestion spec. See below for more details. |yes| +|spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes| +|context|Context containing various task configuration parameters. See below for more details.|no| #### DataSchema @@ -160,6 +161,14 @@ On the contrary, in the incremental publishing mode, segments are incrementally To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig. +### Task Context + +The task context is used for various task configuration parameters. The following parameters apply to all tasks. + +|property|default|description| +|--------|-------|-----------| +|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).| + Segment Merging Tasks --------------------- diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java index 37f98062846e..114a8924682f 100644 --- a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java @@ -229,7 +229,6 @@ public void setUp() throws Exception null, false, false, - null, null ) ) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index d778aea6eeaa..cc25ac18b2d5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -30,7 +30,6 @@ import io.druid.segment.IndexSpec; import io.druid.segment.indexing.TuningConfig; import org.joda.time.DateTime; -import org.joda.time.Period; import java.util.List; import java.util.Map; @@ -46,7 +45,6 @@ public class HadoopTuningConfig implements TuningConfig private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000; private static final boolean DEFAULT_USE_COMBINER = false; private static final int DEFAULT_NUM_BACKGROUND_PERSIST_THREADS = 0; - private static final Period DEFAULT_LOCK_TIMEOUT = new Period("PT5m"); public static HadoopTuningConfig makeDefaultTuningConfig() { @@ -69,8 +67,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() DEFAULT_NUM_BACKGROUND_PERSIST_THREADS, false, false, - null, - DEFAULT_LOCK_TIMEOUT + null ); } @@ -91,7 +88,6 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final boolean forceExtendableShardSpecs; private final boolean useExplicitVersion; private final List allowedHadoopPrefix; - private final Period lockTimeout; @JsonCreator public HadoopTuningConfig( @@ -115,8 +111,7 @@ public HadoopTuningConfig( final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads, final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs, final @JsonProperty("useExplicitVersion") boolean useExplicitVersion, - final @JsonProperty("allowedHadoopPrefix") List allowedHadoopPrefix, - final @JsonProperty("lockTimeout") Period lockTimeout + final @JsonProperty("allowedHadoopPrefix") List allowedHadoopPrefix ) { this.workingPath = workingPath; @@ -143,7 +138,6 @@ public HadoopTuningConfig( Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0"); this.useExplicitVersion = useExplicitVersion; this.allowedHadoopPrefix = allowedHadoopPrefix == null ? ImmutableList.of() : allowedHadoopPrefix; - this.lockTimeout = lockTimeout == null ? DEFAULT_LOCK_TIMEOUT : lockTimeout; } @JsonProperty @@ -252,12 +246,6 @@ public boolean isUseExplicitVersion() return useExplicitVersion; } - @JsonProperty - public Period getLockTimeout() - { - return lockTimeout; - } - @JsonProperty("allowedHadoopPrefix") public List getUserAllowedHadoopPrefix() { @@ -286,8 +274,7 @@ public HadoopTuningConfig withWorkingPath(String path) numBackgroundPersistThreads, forceExtendableShardSpecs, useExplicitVersion, - allowedHadoopPrefix, - lockTimeout + allowedHadoopPrefix ); } @@ -312,8 +299,7 @@ public HadoopTuningConfig withVersion(String ver) numBackgroundPersistThreads, forceExtendableShardSpecs, useExplicitVersion, - allowedHadoopPrefix, - lockTimeout + allowedHadoopPrefix ); } @@ -338,8 +324,7 @@ public HadoopTuningConfig withShardSpecs(Map> specs numBackgroundPersistThreads, forceExtendableShardSpecs, useExplicitVersion, - allowedHadoopPrefix, - lockTimeout + allowedHadoopPrefix ); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 44dd5df4c24c..bfee0b376735 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -388,7 +388,6 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map getContext(); - public Object getContextValue(String key); - + @Nullable + default ContextValueType getContextValue(String key) + { + return getContext() == null ? null : (ContextValueType) getContext().get(key); + } + + default ContextValueType getContextValue(String key, ContextValueType defaultValue) + { + final ContextValueType value = getContextValue(key); + return value == null ? defaultValue : value; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java new file mode 100644 index 000000000000..32924ef3de55 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java @@ -0,0 +1,26 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +public class Tasks +{ + public static String LOCK_TIMEOUT_KEY = "taskLockTimeout"; + public static long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min +} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index a94a3a7add03..0ce369cd1680 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -1051,7 +1051,6 @@ private static IndexTuningConfig createTuningConfig( forceExtendableShardSpecs, forceGuaranteedRollup, reportParseException, - null, null ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index b3aeccdf7fad..fe8c5ec33b10 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -894,7 +894,6 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa 0, reportParseExceptions, handoffTimeout, - null, null ); return new RealtimeIndexTask( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index ca6f5df17afb..2f38f65e2a40 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -188,7 +188,7 @@ public void testIndexTaskSerde() throws Exception jsonMapper ), new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null, null) + new IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null) ), null, jsonMapper @@ -251,7 +251,7 @@ public void testIndexTaskwithResourceSerde() throws Exception jsonMapper ), new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null) + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) ), null, jsonMapper @@ -502,7 +502,6 @@ public Plumber findPlumber( 0, true, null, - null, null ) ), diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index abe25b97d5f6..98f39fb4ac83 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -658,7 +658,7 @@ public void testIndexTask() throws Exception mapper ), new IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null) + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) ), null, MAPPER @@ -716,7 +716,7 @@ public void testIndexTaskFailure() throws Exception mapper ), new IndexIOConfig(new MockExceptionalFirehoseFactory(), false), - new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null, null) + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) ), null, MAPPER @@ -1081,7 +1081,7 @@ public void testResumeTasks() throws Exception mapper ), new IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null, null) + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null) ), null, MAPPER @@ -1203,8 +1203,7 @@ private RealtimeIndexTask newRealtimeIndexTask() 0, null, null, - null, - new Period("PT5s") + null ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); return new RealtimeIndexTask( diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index ba77fa42e0be..5338055a4d8b 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -50,7 +50,6 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig private static final Boolean defaultReportParseExceptions = Boolean.FALSE; private static final long defaultHandoffConditionTimeout = 0; private static final long defaultAlertTimeout = 0; - private static final Period defaultLockTimeoutMs = new Period("PT5m"); private static File createNewBasePersistDirectory() { @@ -75,8 +74,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis 0, defaultReportParseExceptions, defaultHandoffConditionTimeout, - defaultAlertTimeout, - defaultLockTimeoutMs + defaultAlertTimeout ); } @@ -94,7 +92,6 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis private final boolean reportParseExceptions; private final long handoffConditionTimeout; private final long alertTimeout; - private final Period lockTimeout; @JsonCreator public RealtimeTuningConfig( @@ -113,8 +110,7 @@ public RealtimeTuningConfig( @JsonProperty("mergeThreadPriority") int mergeThreadPriority, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, - @JsonProperty("alertTimeout") Long alertTimeout, - @JsonProperty("lockTimeout") Period lockTimeout + @JsonProperty("alertTimeout") Long alertTimeout ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -142,7 +138,6 @@ public RealtimeTuningConfig( this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout; Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0"); - this.lockTimeout = lockTimeout == null ? defaultLockTimeoutMs : lockTimeout; } @Override @@ -245,12 +240,6 @@ public long getAlertTimeout() return alertTimeout; } - @JsonProperty - public Period getLockTimeout() - { - return lockTimeout; - } - public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -268,8 +257,7 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) mergeThreadPriority, reportParseExceptions, handoffConditionTimeout, - alertTimeout, - lockTimeout + alertTimeout ); } @@ -290,8 +278,7 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) mergeThreadPriority, reportParseExceptions, handoffConditionTimeout, - alertTimeout, - lockTimeout + alertTimeout ); } } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 5f7fe7da283f..85e60f3ba616 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -209,7 +209,6 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException 0, null, null, - null, null ); plumber = new TestPlumber(new Sink( @@ -268,7 +267,6 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException 0, null, null, - null, null ); @@ -287,7 +285,6 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException 0, null, null, - null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 20b74fd06503..2ec7aecf0c37 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -28,7 +28,6 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; - import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -81,7 +80,6 @@ EasyMock. anyObject(), 0, false, null, - null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 6048eac644c5..a18b97851ccd 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -148,7 +148,6 @@ public AppenderatorTester( 0, null, null, - null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index ba8f534bef8c..a286515a3f90 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -145,7 +145,6 @@ public int columnCacheSizeBytes() 0, null, null, - null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 53a0e2024e8c..2c50bcddb406 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -199,7 +199,6 @@ public void setUp() throws Exception 0, false, null, - null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 330f5303db17..d6bbd1ab587a 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -71,7 +71,6 @@ public void testSwap() throws Exception 0, null, null, - null, null ); final Sink sink = new Sink( diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index 4ea9acde55f2..b83a93bdbe9d 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -181,7 +181,6 @@ public Plumber findPlumber( 0, true, null, - null, null ) ), From 1da47979fe12ae32ce55002f7b3abe6a927bb024 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 26 Jul 2017 17:13:30 +0900 Subject: [PATCH 5/5] Add caution --- docs/content/ingestion/tasks.md | 5 +++++ .../java/io/druid/indexing/common/task/HadoopIndexTask.java | 1 + .../main/java/io/druid/indexing/common/task/IndexTask.java | 1 + .../io/druid/indexing/common/task/RealtimeIndexTask.java | 2 ++ 4 files changed, 9 insertions(+) diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 5a77afb616e0..318704d34066 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -169,6 +169,11 @@ The task context is used for various task configuration parameters. The followin |--------|-------|-----------| |taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).| +
+When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result. +As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords. +
+ Segment Merging Tasks --------------------- diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 1f64d41ca9a9..275bf7b31dae 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -198,6 +198,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception ) ); final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT); + // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error. TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs)); version = lock.getVersion(); } else { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 13565089b6c4..502992a5850e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -194,6 +194,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception if (determineIntervals) { Interval interval = JodaUtils.umbrellaInterval(shardSpecs.getIntervals()); final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT); + // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error. TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs)); version = lock.getVersion(); dataSchema = ingestionSchema.getDataSchema().withGranularitySpec( diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index ad8752a0b036..9595a28cb2f2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -208,6 +208,8 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception // Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT); + // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http timeout error can occur while waiting for a + // lock to be acquired. final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer() { @Override