diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 76f1268dd4f2..318704d34066 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -89,7 +89,8 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed |--------|-----------|---------| |type|The task type, this should always be "index".|yes| |id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no| -|spec|The ingestion spec. See below for more details. |yes| +|spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes| +|context|Context containing various task configuration parameters. See below for more details.|no| #### DataSchema @@ -160,6 +161,19 @@ On the contrary, in the incremental publishing mode, segments are incrementally To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig. +### Task Context + +The task context is used for various task configuration parameters. The following parameters apply to all tasks. + +|property|default|description| +|--------|-------|-----------| +|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).| + +
+When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result. +As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords. +
+ Segment Merging Tasks --------------------- diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 16a6963d8cfe..19189eac3a68 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1478,7 +1478,7 @@ private void makeToolboxFactory() throws IOException derby.metadataTablesConfigSupplier().get(), derbyConnector ); - taskLockbox = new TaskLockbox(taskStorage, 3000); + taskLockbox = new TaskLockbox(taskStorage); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, metadataStorageCoordinator, diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 100da12068c6..cc25ac18b2d5 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -246,6 +246,13 @@ public boolean isUseExplicitVersion() return useExplicitVersion; } + @JsonProperty("allowedHadoopPrefix") + public List getUserAllowedHadoopPrefix() + { + // Just the user-specified list. More are added in HadoopDruidIndexerConfig. + return allowedHadoopPrefix; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -320,11 +327,4 @@ public HadoopTuningConfig withShardSpecs(Map> specs allowedHadoopPrefix ); } - - @JsonProperty("allowedHadoopPrefix") - public List getUserAllowedHadoopPrefix() - { - // Just the user-specified list. More are added in HadoopDruidIndexerConfig. - return allowedHadoopPrefix; - } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/LockAcquireAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/LockAcquireAction.java index 2d61f0639242..192725d3fd38 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/LockAcquireAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/LockAcquireAction.java @@ -33,12 +33,17 @@ public class LockAcquireAction implements TaskAction @JsonIgnore private final Interval interval; + @JsonIgnore + private final long timeoutMs; + @JsonCreator public LockAcquireAction( - @JsonProperty("interval") Interval interval + @JsonProperty("interval") Interval interval, + @JsonProperty("timeoutMs") long timeoutMs ) { this.interval = interval; + this.timeoutMs = timeoutMs; } @JsonProperty @@ -47,6 +52,12 @@ public Interval getInterval() return interval; } + @JsonProperty + public long getTimeoutMs() + { + return timeoutMs; + } + @Override public TypeReference getReturnTypeReference() { @@ -59,7 +70,11 @@ public TypeReference getReturnTypeReference() public TaskLock perform(Task task, TaskActionToolbox toolbox) { try { - return toolbox.getTaskLockbox().lock(task, interval); + if (timeoutMs == 0) { + return toolbox.getTaskLockbox().lock(task, interval); + } else { + return toolbox.getTaskLockbox().lock(task, interval, timeoutMs); + } } catch (InterruptedException e) { throw Throwables.propagate(e); @@ -77,6 +92,7 @@ public String toString() { return "LockAcquireAction{" + "interval=" + interval + + "timeoutMs=" + timeoutMs + '}'; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 5f4833ee5392..275bf7b31dae 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -29,7 +29,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; - import io.druid.common.utils.JodaUtils; import io.druid.indexer.HadoopDruidDetermineConfigurationJob; import io.druid.indexer.HadoopDruidIndexerConfig; @@ -198,7 +197,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get() ) ); - TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval)); + final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT); + // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error. + TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs)); version = lock.getVersion(); } else { Iterable locks = getTaskLocks(toolbox); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index c6776c67ab84..1477d91b5308 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -207,7 +207,9 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception final DataSchema dataSchema; if (determineIntervals) { Interval interval = JodaUtils.umbrellaInterval(shardSpecs.getIntervals()); - TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval)); + final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT); + // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error. + TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs)); version = lock.getVersion(); dataSchema = ingestionSchema.getDataSchema().withGranularitySpec( ingestionSchema.getDataSchema() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index f55b7f2471fd..9595a28cb2f2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -207,13 +207,16 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception // which will typically be either the main data processing loop or the persist thread. // Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments + final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT); + // Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http timeout error can occur while waiting for a + // lock to be acquired. final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer() { @Override public void announceSegment(final DataSegment segment) throws IOException { // Side effect: Calling announceSegment causes a lock to be acquired - toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval())); + toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs)); toolbox.getSegmentAnnouncer().announceSegment(segment); } @@ -233,7 +236,7 @@ public void announceSegments(Iterable segments) throws IOException { // Side effect: Calling announceSegments causes locks to be acquired for (DataSegment segment : segments) { - toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval())); + toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs)); } toolbox.getSegmentAnnouncer().announceSegments(segments); } @@ -266,7 +269,7 @@ public String getVersion(final Interval interval) try { // Side effect: Calling getVersion causes a lock to be acquired final TaskLock myLock = toolbox.getTaskActionClient() - .submit(new LockAcquireAction(interval)); + .submit(new LockAcquireAction(interval, lockTimeoutMs)); return myLock.getVersion(); } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 39210ad91229..20cf4c372b6a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -27,6 +27,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; +import javax.annotation.Nullable; import java.util.Map; /** @@ -166,6 +167,15 @@ public interface Task public Map getContext(); - public Object getContextValue(String key); - + @Nullable + default ContextValueType getContextValue(String key) + { + return getContext() == null ? null : (ContextValueType) getContext().get(key); + } + + default ContextValueType getContextValue(String key, ContextValueType defaultValue) + { + final ContextValueType value = getContextValue(key); + return value == null ? defaultValue : value; + } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java new file mode 100644 index 000000000000..32924ef3de55 --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Tasks.java @@ -0,0 +1,26 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.task; + +public class Tasks +{ + public static String LOCK_TIMEOUT_KEY = "taskLockTimeout"; + public static long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min +} diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 599e3a039918..39017d486d73 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -41,7 +41,6 @@ import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.guava.FunctionalIterable; -import io.druid.server.initialization.ServerConfig; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -69,7 +68,6 @@ public class TaskLockbox private final TaskStorage taskStorage; private final ReentrantLock giant = new ReentrantLock(true); private final Condition lockReleaseCondition = giant.newCondition(); - protected final long lockTimeoutMillis; private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); @@ -79,21 +77,10 @@ public class TaskLockbox @Inject public TaskLockbox( - TaskStorage taskStorage, - ServerConfig serverConfig + TaskStorage taskStorage ) { this.taskStorage = taskStorage; - this.lockTimeoutMillis = serverConfig.getMaxIdleTime().getMillis(); - } - - public TaskLockbox( - TaskStorage taskStorage, - long lockTimeoutMillis - ) - { - this.taskStorage = taskStorage; - this.lockTimeoutMillis = lockTimeoutMillis; } /** @@ -140,7 +127,7 @@ public int compare(Pair left, Pair right) continue; } - final TaskLockPosse taskLockPosse = tryAddTaskToLockPosse( + final TaskLockPosse taskLockPosse = createOrFindLockPosse( task, savedTaskLock.getInterval(), Optional.of(savedTaskLock.getVersion()) @@ -190,44 +177,52 @@ public int compare(Pair left, Pair right) } /** - * Acquires a lock on behalf of a task. Blocks until the lock is acquired. Throws an exception if the lock - * cannot be acquired. + * Acquires a lock on behalf of a task. Blocks until the lock is acquired. * * @param task task to acquire lock for * @param interval interval to lock * @return acquired TaskLock * - * @throws InterruptedException if the lock cannot be acquired + * @throws InterruptedException if the current thread is interrupted */ public TaskLock lock(final Task task, final Interval interval) throws InterruptedException { - long timeout = lockTimeoutMillis; - giant.lock(); + giant.lockInterruptibly(); try { Optional taskLock; while (!(taskLock = tryLock(task, interval)).isPresent()) { - long startTime = System.currentTimeMillis(); - lockReleaseCondition.await(timeout, TimeUnit.MILLISECONDS); - long timeDelta = System.currentTimeMillis() - startTime; - if (timeDelta >= timeout) { - log.error( - "Task [%s] can not acquire lock for interval [%s] within [%s] ms", - task.getId(), - interval, - lockTimeoutMillis - ); + lockReleaseCondition.await(); + } + return taskLock.get(); + } + finally { + giant.unlock(); + } + } - throw new InterruptedException(String.format( - "Task [%s] can not acquire lock for interval [%s] within [%s] ms", - task.getId(), - interval, - lockTimeoutMillis - )); - } else { - timeout -= timeDelta; + /** + * Acquires a lock on behalf of a task, waiting up to the specified wait time if necessary. + * + * @param task task to acquire a lock for + * @param interval interval to lock + * @param timeoutMs maximum time to wait + * + * @return acquired lock + * + * @throws InterruptedException if the current thread is interrupted + */ + public TaskLock lock(final Task task, final Interval interval, long timeoutMs) throws InterruptedException + { + long nanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs); + giant.lockInterruptibly(); + try { + Optional taskLock; + while (!(taskLock = tryLock(task, interval)).isPresent()) { + if (nanos <= 0) { + return null; } + nanos = lockReleaseCondition.awaitNanos(nanos); } - return taskLock.get(); } finally { @@ -274,7 +269,7 @@ private Optional tryLock(final Task task, final Interval interval, fin } Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty"); - final TaskLockPosse posseToUse = tryAddTaskToLockPosse(task, interval, preferredVersion); + final TaskLockPosse posseToUse = createOrFindLockPosse(task, interval, preferredVersion); if (posseToUse != null) { // Add to existing TaskLockPosse, if necessary if (posseToUse.getTaskIds().add(task.getId())) { @@ -310,7 +305,7 @@ private Optional tryLock(final Task task, final Interval interval, fin } - private TaskLockPosse tryAddTaskToLockPosse( + private TaskLockPosse createOrFindLockPosse( final Task task, final Interval interval, final Optional preferredVersion diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java index f189356d0dfd..5cac6ce6a65e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentInsertActionTest.java @@ -90,7 +90,7 @@ public void testSimple() throws Exception final Task task = new NoopTask(null, 0, 0, null, null, null); final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2)); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); action.perform(task, actionTestKit.getTaskActionToolbox()); Assert.assertEquals( @@ -108,7 +108,7 @@ public void testFailBadVersion() throws Exception final Task task = new NoopTask(null, 0, 0, null, null, null); final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3)); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); thrown.expect(IllegalStateException.class); thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task")); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index c52daae392b4..c88d1db02ffe 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -89,7 +89,7 @@ public void testTransactional() throws Exception { final Task task = new NoopTask(null, 0, 0, null, null, null); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); SegmentPublishResult result1 = new SegmentTransactionalInsertAction( ImmutableSet.of(SEGMENT1), @@ -130,7 +130,7 @@ public void testFailTransactional() throws Exception { final Task task = new NoopTask(null, 0, 0, null, null, null); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); SegmentPublishResult result = new SegmentTransactionalInsertAction( ImmutableSet.of(SEGMENT1), @@ -150,7 +150,7 @@ public void testFailBadVersion() throws Exception final Task task = new NoopTask(null, 0, 0, null, null, null); final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(ImmutableSet.of(SEGMENT3)); actionTestKit.getTaskLockbox().add(task); - actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL)); + actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000); thrown.expect(IllegalStateException.class); thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task")); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java index 8a3442d9dbbd..3fe74a780eb9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java @@ -80,7 +80,7 @@ public TaskActionToolbox getTaskActionToolbox() public void before() { taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H"))); - taskLockbox = new TaskLockbox(taskStorage, 300); + taskLockbox = new TaskLockbox(taskStorage); testDerbyConnector = new TestDerbyConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance(metadataStorageTablesConfig) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index de9d213a9fe3..fe8c5ec33b10 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -933,7 +933,7 @@ private TaskToolbox makeToolbox( ) { final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); - final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, 300); + final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); try { taskStorage.insert(task, TaskStatus.running(task.getId())); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index aaf3de4be0d9..b78e89997ebc 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -30,6 +30,9 @@ import io.druid.indexer.HadoopIOConfig; import io.druid.indexer.HadoopIngestionSpec; import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; +import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.java.util.common.granularity.Granularities; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -172,7 +175,7 @@ public void testIndexTaskSerde() throws Exception final IndexTask task = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -184,8 +187,8 @@ public void testIndexTaskSerde() throws Exception ), jsonMapper ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null) + new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), + new IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null) ), null ); @@ -234,7 +237,7 @@ public void testIndexTaskwithResourceSerde() throws Exception final IndexTask task = new IndexTask( null, new TaskResource("rofl", 2), - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -246,8 +249,8 @@ public void testIndexTaskwithResourceSerde() throws Exception ), jsonMapper ), - new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) + new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) ), null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 1c8c371dbcf5..067ec8b4cd41 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -151,7 +151,7 @@ public static Collection constructorFeeder() throws IOException } INDEX_MERGER_V9.persist(index, persistDir, indexSpec); - final TaskLockbox tl = new TaskLockbox(ts, 300); + final TaskLockbox tl = new TaskLockbox(ts); final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) { final private Set published = Sets.newHashSet(); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java index 3761ad051101..a168460769d4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RealtimeishTask.java @@ -72,7 +72,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception // Sort of similar to what realtime tasks do: // Acquire lock for first interval - final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1)); + final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1, 5000)); final List locks1 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) @@ -80,7 +80,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception Assert.assertEquals("locks1", ImmutableList.of(lock1), locks1); // Acquire lock for second interval - final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2)); + final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2, 5000)); final List locks2 = toolbox.getTaskActionClient().submit(new LockListAction()); // (Confirm lock sanity) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 932a04bf4d2e..569590cbbcd9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -60,6 +60,9 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.AbstractFixedIntervalTask; import io.druid.indexing.common.task.IndexTask; +import io.druid.indexing.common.task.IndexTask.IndexIOConfig; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; +import io.druid.indexing.common.task.IndexTask.IndexTuningConfig; import io.druid.indexing.common.task.KillTask; import io.druid.indexing.common.task.RealtimeIndexTask; import io.druid.indexing.common.task.Task; @@ -514,7 +517,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( Preconditions.checkNotNull(taskStorage); Preconditions.checkNotNull(emitter); - taskLockbox = new TaskLockbox(taskStorage, 300); + taskLockbox = new TaskLockbox(taskStorage); tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock( SupervisorManager.class))); File tmpDir = temporaryFolder.newFolder(); @@ -642,7 +645,7 @@ public void testIndexTask() throws Exception final Task indexTask = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -654,8 +657,8 @@ public void testIndexTask() throws Exception ), mapper ), - new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) + new IndexIOConfig(new MockFirehoseFactory(false), false), + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) ), null ); @@ -699,7 +702,7 @@ public void testIndexTaskFailure() throws Exception final Task indexTask = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -711,8 +714,8 @@ public void testIndexTaskFailure() throws Exception ), mapper ), - new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) + new IndexIOConfig(new MockExceptionalFirehoseFactory(), false), + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null) ), null ); @@ -1063,7 +1066,7 @@ public void testResumeTasks() throws Exception final Task indexTask = new IndexTask( null, null, - new IndexTask.IndexIngestionSpec( + new IndexIngestionSpec( new DataSchema( "foo", null, @@ -1075,8 +1078,8 @@ public void testResumeTasks() throws Exception ), mapper ), - new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null) + new IndexIOConfig(new MockFirehoseFactory(false), false), + new IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null) ), null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index 224000e1bdc7..f30b15d6a7f4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -19,13 +19,11 @@ package io.druid.indexing.overlord; -import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import io.druid.data.input.FirehoseFactory; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.config.TaskStorageConfig; @@ -37,10 +35,8 @@ import io.druid.metadata.EntryExistsException; import io.druid.metadata.SQLMetadataStorageActionHandlerFactory; import io.druid.metadata.TestDerbyConnector; -import io.druid.server.initialization.ServerConfig; import org.easymock.EasyMock; import org.joda.time.Interval; -import org.joda.time.Period; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -48,7 +44,6 @@ import org.junit.rules.ExpectedException; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; public class TaskLockboxTest @@ -57,7 +52,6 @@ public class TaskLockboxTest public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule(); private final ObjectMapper objectMapper = new DefaultObjectMapper(); - private ServerConfig serverConfig; private TaskStorage taskStorage; private TaskLockbox lockbox; @@ -78,15 +72,11 @@ public void setup() objectMapper ) ); - serverConfig = EasyMock.niceMock(ServerConfig.class); - EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100)).anyTimes(); - EasyMock.replay(serverConfig); - ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); EmittingLogger.registerEmitter(emitter); EasyMock.replay(emitter); - lockbox = new TaskLockbox(taskStorage, serverConfig); + lockbox = new TaskLockbox(taskStorage); } @Test @@ -179,20 +169,18 @@ public void testTryLockAfterTaskComplete() public void testTimeoutForLock() throws InterruptedException { Task task1 = NoopTask.create(); - Task task2 = new SomeTask(null, 0, 0, null, null, null); + Task task2 = NoopTask.create(); lockbox.add(task1); lockbox.add(task2); - exception.expect(InterruptedException.class); - exception.expectMessage("can not acquire lock for interval"); - lockbox.lock(task1, new Interval("2015-01-01/2015-01-02")); - lockbox.lock(task2, new Interval("2015-01-01/2015-01-15")); + Assert.assertNotNull(lockbox.lock(task1, new Interval("2015-01-01/2015-01-02"), 5000)); + Assert.assertNull(lockbox.lock(task2, new Interval("2015-01-01/2015-01-15"), 1000)); } @Test public void testSyncFromStorage() throws EntryExistsException { - final TaskLockbox originalBox = new TaskLockbox(taskStorage, serverConfig); + final TaskLockbox originalBox = new TaskLockbox(taskStorage); for (int i = 0; i < 5; i++) { final Task task = NoopTask.create(); taskStorage.insert(task, TaskStatus.running(task.getId())); @@ -207,7 +195,7 @@ public void testSyncFromStorage() throws EntryExistsException .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) .collect(Collectors.toList()); - final TaskLockbox newBox = new TaskLockbox(taskStorage, serverConfig); + final TaskLockbox newBox = new TaskLockbox(taskStorage); newBox.syncFromStorage(); Assert.assertEquals(originalBox.getAllLocks(), newBox.getAllLocks()); @@ -219,32 +207,4 @@ public void testSyncFromStorage() throws EntryExistsException Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); } - - public static class SomeTask extends NoopTask - { - - public SomeTask( - @JsonProperty("id") String id, - @JsonProperty("runTime") long runTime, - @JsonProperty("isReadyTime") long isReadyTime, - @JsonProperty("isReadyResult") String isReadyResult, - @JsonProperty("firehose") FirehoseFactory firehoseFactory, - @JsonProperty("context") Map context - ) - { - super(id, runTime, isReadyTime, isReadyResult, firehoseFactory, context); - } - - @Override - public String getType() - { - return "someTask"; - } - - @Override - public String getGroupId() - { - return "someGroupId"; - } - } } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index f84961a45ab1..3a6dc6dcfb0a 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -28,7 +28,6 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; - import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test;