From 2252bc3126864261f73361f4c362b1503fc33f4e Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 21 Dec 2015 23:25:03 -0800 Subject: [PATCH 1/3] TaskLockbox: Consider active tasks active even if they have no locks. --- .../src/main/java/io/druid/indexing/overlord/TaskLockbox.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 816124894694..e2f5ff5a2734 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -87,8 +87,10 @@ public void syncFromStorage() try { // Load stuff from taskStorage first. If this fails, we don't want to lose all our locks. + final Set storedActiveTasks = Sets.newHashSet(); final List> storedLocks = Lists.newArrayList(); for (final Task task : taskStorage.getActiveTasks()) { + storedActiveTasks.add(task.getId()); for (final TaskLock taskLock : taskStorage.getLocks(task.getId())) { storedLocks.add(Pair.of(task, taskLock)); } @@ -108,6 +110,7 @@ public int compare(Pair left, Pair right) }; running.clear(); activeTasks.clear(); + activeTasks.addAll(storedActiveTasks); // Bookkeeping for a log message at the end int taskLockCount = 0; for (final Pair taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { @@ -118,7 +121,6 @@ public int compare(Pair left, Pair right) log.warn("WTF?! Got lock with empty interval for task: %s", task.getId()); continue; } - activeTasks.add(task.getId()); final Optional acquiredTaskLock = tryLock( task, savedTaskLock.getInterval(), From 6042dd58f96a93d2cc9d5527c93441acb3f25d55 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 21 Dec 2015 23:30:36 -0800 Subject: [PATCH 2/3] RealtimeIndexTask: Fix a couple of problems with restoring. - Shedding locks at startup is bad, we actually want to keep them. Stop doing that. - stopGracefully now interrupts the run thread if had started running finishJob. This avoids waiting for handoff unnecessarily. --- .../common/task/RealtimeIndexTask.java | 102 ++++++----- .../common/task/RealtimeIndexTaskTest.java | 163 +++++++++++++++++- 2 files changed, 220 insertions(+), 45 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index c3aa6f480c8b..6a872bdbed31 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -113,7 +113,13 @@ private static String makeDatasource(FireDepartment fireDepartment) private volatile Firehose firehose = null; @JsonIgnore - private volatile boolean stopped = false; + private volatile boolean gracefullyStopped = false; + + @JsonIgnore + private volatile boolean finishingJob = false; + + @JsonIgnore + private volatile Thread runThread = null; @JsonIgnore private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null; @@ -170,16 +176,12 @@ public boolean isReady(TaskActionClient taskActionClient) throws Exception @Override public TaskStatus run(final TaskToolbox toolbox) throws Exception { + runThread = Thread.currentThread(); + if (this.plumber != null) { throw new IllegalStateException("WTF?!? run with non-null plumber??!"); } - // Shed any locks we might have (e.g. if we were uncleanly killed and restarted) since we'll reacquire - // them if we actually need them - for (final TaskLock taskLock : getTaskLocks(toolbox)) { - toolbox.getTaskActionClient().submit(new LockReleaseAction(taskLock.getInterval())); - } - boolean normalExit = true; // It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for @@ -309,7 +311,7 @@ public String getVersion(final Interval interval) committerSupplier = Committers.supplierFromFirehose(firehose); // Time to read data! - while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) { + while ((!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) { final InputRow inputRow; try { @@ -346,39 +348,55 @@ public String getVersion(final Interval interval) finally { if (normalExit) { try { - if (!stopped) { - // Hand off all pending data - log.info("Persisting and handing off pending data."); - plumber.persist(committerSupplier.get()); - plumber.finishJob(); - } else { - log.info("Persisting pending data without handoff, in preparation for restart."); - final Committer committer = committerSupplier.get(); - final CountDownLatch persistLatch = new CountDownLatch(1); - plumber.persist( - new Committer() + // Always want to persist. + log.info("Persisting remaining data."); + + final Committer committer = committerSupplier.get(); + final CountDownLatch persistLatch = new CountDownLatch(1); + plumber.persist( + new Committer() + { + @Override + public Object getMetadata() { - @Override - public Object getMetadata() - { - return committer.getMetadata(); - } + return committer.getMetadata(); + } - @Override - public void run() - { - try { - committer.run(); - } - finally { - persistLatch.countDown(); - } + @Override + public void run() + { + try { + committer.run(); + } + finally { + persistLatch.countDown(); } } - ); - persistLatch.await(); + } + ); + persistLatch.await(); + + if (gracefullyStopped) { + log.info("Gracefully stopping."); + } else { + log.info("Finishing the job."); + synchronized (this) { + if (gracefullyStopped) { + // Someone called stopGracefully after we checked the flag. That's okay, just stop now. + log.info("Gracefully stopping."); + } else { + finishingJob = true; + } + } + + if (finishingJob) { + plumber.finishJob(); + } } } + catch (InterruptedException e) { + log.debug(e, "Interrupted while finishing the job"); + } catch (Exception e) { log.makeAlert(e, "Failed to finish realtime task").emit(); throw e; @@ -406,13 +424,17 @@ public void stopGracefully() { try { synchronized (this) { - if (!stopped) { - stopped = true; - log.info("Gracefully stopping."); - if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) { + if (!gracefullyStopped) { + gracefullyStopped = true; + if (finishingJob) { + log.info("stopGracefully: Interrupting finishJob."); + runThread.interrupt(); + } else if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) { + log.info("stopGracefully: Draining firehose."); firehose.close(); } else { - log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose); + log.info("stopGracefully: Cannot drain firehose by closing, interrupting run thread."); + runThread.interrupt(); } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index a5399e076331..f0eb6b3ce1f6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -24,6 +24,8 @@ import com.google.api.client.util.Sets; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; @@ -63,6 +65,7 @@ import io.druid.indexing.test.TestDataSegmentPusher; import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import io.druid.jackson.DefaultObjectMapper; +import io.druid.metadata.EntryExistsException; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Druids; import io.druid.query.IntervalChunkingQueryRunnerDecorator; @@ -96,6 +99,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.metrics.EventReceiverFirehoseRegister; +import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Period; @@ -175,6 +179,7 @@ public void testBasics() throws Exception final RealtimeIndexTask task = makeRealtimeTask(null); final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); + final DataSegment publishedSegment; // Wait for firehose to show up, it starts off null. while (task.getFirehose() == null) { @@ -207,11 +212,22 @@ public void testBasics() throws Exception Thread.sleep(50); } + publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + // Do a query. Assert.assertEquals(2, countEvents(task)); // Simulate handoff. - for(Pair executorRunnablePair : handOffCallbacks.values()){ + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } handOffCallbacks.clear(); @@ -226,6 +242,7 @@ public void testRestore() throws Exception { final File directory = tempFolder.newFolder(); final RealtimeIndexTask task1 = makeRealtimeTask(null); + final DataSegment publishedSegment; // First run: { @@ -298,11 +315,123 @@ public void testRestore() throws Exception Thread.sleep(50); } + publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + // Do a query. Assert.assertEquals(2, countEvents(task2)); // Simulate handoff. - for(Pair executorRunnablePair : handOffCallbacks.values()){ + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); + executorRunnablePair.lhs.execute(executorRunnablePair.rhs); + } + handOffCallbacks.clear(); + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); + } + } + + @Test(timeout = 10000L) + public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception + { + final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final File directory = tempFolder.newFolder(); + final RealtimeIndexTask task1 = makeRealtimeTask(null); + final DataSegment publishedSegment; + + // First run: + { + final TaskToolbox taskToolbox = makeToolbox(task1, taskStorage, mdc, directory); + final ListenableFuture statusFuture = runTask(task1, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task1.getFirehose() == null) { + Thread.sleep(50); + } + + final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = + (EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose(); + + firehose.addRows( + ImmutableList.of( + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo") + ) + ) + ); + + // Stop the firehose, this will trigger a finishJob. + firehose.close(); + + // Wait for publish. + while (mdc.getPublished().isEmpty()) { + Thread.sleep(50); + } + + publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + + // Do a query. + Assert.assertEquals(1, countEvents(task1)); + + // Trigger graceful shutdown. + task1.stopGracefully(); + + // Wait for the task to finish. The status doesn't really matter. + while (!statusFuture.isDone()) { + Thread.sleep(50); + } + } + + // Second run: + { + final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); + final TaskToolbox taskToolbox = makeToolbox(task2, taskStorage, mdc, directory); + final ListenableFuture statusFuture = runTask(task2, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task2.getFirehose() == null) { + Thread.sleep(50); + } + + // Stop the firehose again, this will start another handoff. + final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = + (EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose(); + + // Stop the firehose, this will trigger a finishJob. + firehose.close(); + + // publishedSegment is still published. No reason it shouldn't be. + Assert.assertEquals(ImmutableSet.of(publishedSegment), mdc.getPublished()); + + // Wait for a handoffCallback to show up. + while (handOffCallbacks.isEmpty()) { + Thread.sleep(50); + } + + // Simulate handoff. + for (Map.Entry> entry : handOffCallbacks.entrySet()) { + final Pair executorRunnablePair = entry.getValue(); + Assert.assertEquals( + new SegmentDescriptor( + publishedSegment.getInterval(), + publishedSegment.getVersion(), + publishedSegment.getShardSpec().getPartitionNum() + ), + entry.getKey() + ); executorRunnablePair.lhs.execute(executorRunnablePair.rhs); } handOffCallbacks.clear(); @@ -456,11 +585,36 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId) ); } - private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory) + private TaskToolbox makeToolbox( + final Task task, + final IndexerMetadataStorageCoordinator mdc, + final File directory + ) + { + return makeToolbox( + task, + new HeapMemoryTaskStorage(new TaskStorageConfig(null)), + mdc, + directory + ); + } + + private TaskToolbox makeToolbox( + final Task task, + final TaskStorage taskStorage, + final IndexerMetadataStorageCoordinator mdc, + final File directory + ) { - final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, null, null); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); + try { + taskStorage.insert(task, TaskStatus.running(task.getId())); + } + catch (EntryExistsException e) { + // suppress + } + taskLockbox.syncFromStorage(); final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, mdc, @@ -566,7 +720,6 @@ public List getLocations() new CacheConfig() ); - taskLockbox.add(task); return toolboxFactory.build(task); } From 981b1f147a784622efd259cf2a0891ed46ddb0e5 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 22 Dec 2015 10:31:20 -0800 Subject: [PATCH 3/3] druid.indexer.task.restoreTasksOnRestart configuration. --- docs/content/configuration/indexing-service.md | 1 + .../io/druid/indexing/common/config/TaskConfig.java | 11 +++++++++++ .../io/druid/indexing/overlord/ForkingTaskRunner.java | 2 +- .../druid/indexing/overlord/ThreadPoolTaskRunner.java | 2 +- .../io/druid/indexing/common/TaskToolboxTest.java | 2 +- .../indexing/common/task/RealtimeIndexTaskTest.java | 2 +- .../firehose/IngestSegmentFirehoseFactoryTest.java | 2 +- .../IngestSegmentFirehoseFactoryTimelineTest.java | 2 +- .../io/druid/indexing/overlord/TaskLifecycleTest.java | 2 +- .../druid/indexing/worker/WorkerTaskMonitorTest.java | 11 ++++++++++- 10 files changed, 29 insertions(+), 8 deletions(-) diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 275b6c6db12f..2612b190463c 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -273,6 +273,7 @@ Additional peon configs include: |`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0| +|`druid.indexer.task.restoreTasksOnRestart`|If true, middleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false| |`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M| |`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index da4a03f329c0..4071f6a28eb2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -50,6 +50,9 @@ public class TaskConfig @JsonProperty private final List defaultHadoopCoordinates; + @JsonProperty + private final boolean restoreTasksOnRestart; + @JsonProperty private final Period gracefulShutdownTimeout; @@ -63,6 +66,7 @@ public TaskConfig( @JsonProperty("hadoopWorkingPath") String hadoopWorkingPath, @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary, @JsonProperty("defaultHadoopCoordinates") List defaultHadoopCoordinates, + @JsonProperty("restoreTasksOnRestart") boolean restoreTasksOnRestart, @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, @JsonProperty("directoryLockTimeout") Period directoryLockTimeout ) @@ -74,6 +78,7 @@ public TaskConfig( this.defaultHadoopCoordinates = defaultHadoopCoordinates == null ? DEFAULT_DEFAULT_HADOOP_COORDINATES : defaultHadoopCoordinates; + this.restoreTasksOnRestart = restoreTasksOnRestart; this.gracefulShutdownTimeout = gracefulShutdownTimeout == null ? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT : gracefulShutdownTimeout; @@ -127,6 +132,12 @@ public List getDefaultHadoopCoordinates() return defaultHadoopCoordinates; } + @JsonProperty + public boolean isRestoreTasksOnRestart() + { + return restoreTasksOnRestart; + } + @JsonProperty public Period getGracefulShutdownTimeout() { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 3595a12b7f41..a0384f48c42d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -146,7 +146,7 @@ public List>> restore() throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId()); } - if (task.canRestore()) { + if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) { log.info("Restoring task[%s].", task.getId()); retVal.add(Pair.of(task, run(task))); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index a20d8d0f4d65..12b15a0e13ef 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -98,7 +98,7 @@ public void stop() final long elapsed; boolean error = false; - if (task.canRestore()) { + if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) { // Attempt graceful shutdown. graceful = true; log.info("Starting graceful shutdown of task[%s].", task.getId()); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index da4c06d0a114..97dbab4489c4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -88,7 +88,7 @@ public void setUp() throws IOException EasyMock.replay(task, mockHandoffNotifierFactory); taskToolbox = new TaskToolboxFactory( - new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null), + new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, false, null, null), mockTaskActionClientFactory, mockEmitter, mockSegmentPusher, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index f0eb6b3ce1f6..2e1c05af31fd 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -606,7 +606,7 @@ private TaskToolbox makeToolbox( final File directory ) { - final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, null, null); + final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); try { taskStorage.insert(task, TaskStatus.running(task.getId())); diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 26b90592a6a8..cc985efdf313 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -186,7 +186,7 @@ public void deleteSegments(Set segments) EasyMock.replay(notifierFactory); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), + new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), tac, newMockEmitter(), new DataSegmentPusher() diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 204224722d2c..4648ea893fd4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -286,7 +286,7 @@ public RetType submit(TaskAction taskAction) throws IOExcepti SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), + new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, false, null, null), new TaskActionClientFactory() { @Override diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 4f8aeb058770..726434c93e76 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -440,7 +440,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) { - final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, null, null); + final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); tsqa = new TaskStorageQueryAdapter(ts); tl = new TaskLockbox(ts); mdc = newMockMDC(); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 744a908159fa..0ad20c5e1fa6 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -129,7 +129,16 @@ public String getBase() private WorkerTaskMonitor createTaskMonitor() { - final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null); + final TaskConfig taskConfig = new TaskConfig( + Files.createTempDir().toString(), + null, + null, + 0, + null, + false, + null, + null + ); TaskActionClientFactory taskActionClientFactory = EasyMock.createNiceMock(TaskActionClientFactory.class); TaskActionClient taskActionClient = EasyMock.createNiceMock(TaskActionClient.class); EasyMock.expect(taskActionClientFactory.create(EasyMock.anyObject())).andReturn(taskActionClient).anyTimes();