From 36569094ce83c8604b06002d852467233e390c11 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 27 Oct 2015 18:07:06 -0700 Subject: [PATCH 1/2] EventReceiverFirehose: Drain buffer when closed, until empty. --- .../realtime/firehose/EventReceiverFirehoseFactory.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index c58147f05014..7d1934f47c68 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -207,8 +207,11 @@ public boolean hasMore() { synchronized (readLock) { try { - while (!closed && nextRow == null) { + while (nextRow == null) { nextRow = buffer.poll(500, TimeUnit.MILLISECONDS); + if (closed) { + break; + } } } catch (InterruptedException e) { From 501dcb43fa32f78213cc7449b011c0ce2b61fdc3 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 27 Oct 2015 21:07:38 -0700 Subject: [PATCH 2/2] Some changes that make it possible to restart tasks on the same hardware. This is done by killing and respawning the jvms rather than reconnecting to existing jvms, for a couple reasons. One is that it lets you restore tasks after server reboots too, and another is that it lets you upgrade all the software on a box at once by just restarting everything. The main changes are, 1) Add "canRestore" and "stopGracefully" methods to Tasks that say if a task can stop gracefully, and actually do a graceful stop. RealtimeIndexTask is the only one that currently implements this. 2) Add "stop" method to TaskRunners that attempts to do an orderly shutdown. ThreadPoolTaskRunner- call stopGracefully on restorable tasks, wait for exit ForkingTaskRunner- close output stream to restorable tasks, wait for exit RemoteTaskRunner- do nothing special, we actually don't want to shutdown 3) Add "restore" method to TaskRunners that attempts to bootstrap tasks from last run. Only ForkingTaskRunner does anything here. It maintains a "restore.json" file with a list of restorable tasks. 4) Have the CliPeon's ExecutorLifecycle lock the task base directory to avoid a restored task and a zombie old task from stomping on each other. --- .../content/configuration/indexing-service.md | 2 + .../indexing/common/TaskToolboxFactory.java | 2 +- .../indexing/common/config/TaskConfig.java | 48 +- .../indexing/common/task/AbstractTask.java | 13 + .../common/task/RealtimeIndexTask.java | 107 +++- .../io/druid/indexing/common/task/Task.java | 24 +- .../indexing/overlord/ForkingTaskRunner.java | 196 ++++++- .../indexing/overlord/RemoteTaskRunner.java | 7 + .../io/druid/indexing/overlord/TaskQueue.java | 3 + .../druid/indexing/overlord/TaskRunner.java | 14 + .../overlord/ThreadPoolTaskRunner.java | 96 +++- .../indexing/worker/WorkerTaskMonitor.java | 166 +++--- .../worker/executor/ExecutorLifecycle.java | 79 ++- .../indexing/common/TaskToolboxTest.java | 2 +- .../common/task/RealtimeIndexTaskTest.java | 508 ++++++++++++++++++ .../IngestSegmentFirehoseFactoryTest.java | 2 +- ...estSegmentFirehoseFactoryTimelineTest.java | 2 +- .../indexing/overlord/TaskLifecycleTest.java | 103 ++-- .../overlord/http/OverlordResourceTest.java | 13 + .../test/TestDataSegmentAnnouncer.java | 66 +++ .../indexing/test/TestDataSegmentKiller.java | 44 ++ .../indexing/test/TestDataSegmentPusher.java | 52 ++ ...TestIndexerMetadataStorageCoordinator.java | 113 ++++ .../druid/indexing/test/TestServerView.java | 116 ++++ .../worker/WorkerTaskMonitorTest.java | 7 +- .../EventReceiverFirehoseFactory.java | 40 +- 26 files changed, 1598 insertions(+), 227 deletions(-) create mode 100644 indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentKiller.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java create mode 100644 indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java diff --git a/docs/content/configuration/indexing-service.md b/docs/content/configuration/indexing-service.md index 3e628f38f9f0..b2eaa4e97205 100644 --- a/docs/content/configuration/indexing-service.md +++ b/docs/content/configuration/indexing-service.md @@ -279,6 +279,8 @@ Additional peon configs include: |`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing| |`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000| |`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0| +|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M| +|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M| If `druid.indexer.runner.separateIngestionEndpoint` is set to true then following configurations are available for the ingestion server at peon: diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index ef8ef3e2b79b..763a74ff7e03 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -109,7 +109,7 @@ public TaskToolboxFactory( public TaskToolbox build(Task task) { - final File taskWorkDir = new File(new File(config.getBaseTaskDir(), task.getId()), "work"); + final File taskWorkDir = config.getTaskWorkDir(task.getId()); return new TaskToolbox( config, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index 4e17326d41f2..da4a03f329c0 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import org.joda.time.Period; import java.io.File; import java.util.List; @@ -30,6 +31,10 @@ public class TaskConfig "org.apache.hadoop:hadoop-client:2.3.0" ); + private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); + + private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M"); + @JsonProperty private final String baseDir; @@ -45,13 +50,21 @@ public class TaskConfig @JsonProperty private final List defaultHadoopCoordinates; + @JsonProperty + private final Period gracefulShutdownTimeout; + + @JsonProperty + private final Period directoryLockTimeout; + @JsonCreator public TaskConfig( @JsonProperty("baseDir") String baseDir, @JsonProperty("baseTaskDir") String baseTaskDir, @JsonProperty("hadoopWorkingPath") String hadoopWorkingPath, @JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary, - @JsonProperty("defaultHadoopCoordinates") List defaultHadoopCoordinates + @JsonProperty("defaultHadoopCoordinates") List defaultHadoopCoordinates, + @JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout, + @JsonProperty("directoryLockTimeout") Period directoryLockTimeout ) { this.baseDir = baseDir == null ? "/tmp" : baseDir; @@ -61,6 +74,12 @@ public TaskConfig( this.defaultHadoopCoordinates = defaultHadoopCoordinates == null ? DEFAULT_DEFAULT_HADOOP_COORDINATES : defaultHadoopCoordinates; + this.gracefulShutdownTimeout = gracefulShutdownTimeout == null + ? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT + : gracefulShutdownTimeout; + this.directoryLockTimeout = directoryLockTimeout == null + ? DEFAULT_DIRECTORY_LOCK_TIMEOUT + : directoryLockTimeout; } @JsonProperty @@ -75,6 +94,21 @@ public File getBaseTaskDir() return baseTaskDir; } + public File getTaskDir(String taskId) + { + return new File(baseTaskDir, taskId); + } + + public File getTaskWorkDir(String taskId) + { + return new File(getTaskDir(taskId), "work"); + } + + public File getTaskLockFile(String taskId) + { + return new File(getTaskDir(taskId), "lock"); + } + @JsonProperty public String getHadoopWorkingPath() { @@ -93,6 +127,18 @@ public List getDefaultHadoopCoordinates() return defaultHadoopCoordinates; } + @JsonProperty + public Period getGracefulShutdownTimeout() + { + return gracefulShutdownTimeout; + } + + @JsonProperty + public Period getDirectoryLockTimeout() + { + return directoryLockTimeout; + } + private String defaultDir(String configParameter, final String defaultVal) { if (configParameter == null) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index 0ac699f2bdc3..d98fd6074197 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -128,6 +128,19 @@ public String getClasspathPrefix() return null; } + @Override + public boolean canRestore() + { + return false; + } + + @Override + public void stopGracefully() + { + // Should not be called when canRestore = false. + throw new UnsupportedOperationException("Cannot stop gracefully"); + } + @Override public String toString() { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index b4f750040313..9c2061f8d7ec 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -29,6 +29,7 @@ import com.metamx.emitter.EmittingLogger; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; +import io.druid.data.input.FirehoseFactory; import io.druid.data.input.InputRow; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; @@ -48,6 +49,9 @@ import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; +import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; +import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -62,6 +66,7 @@ import java.io.IOException; import java.util.Map; import java.util.Random; +import java.util.concurrent.CountDownLatch; public class RealtimeIndexTask extends AbstractTask { @@ -104,6 +109,12 @@ private static String makeDatasource(FireDepartment fireDepartment) @JsonIgnore private volatile Plumber plumber = null; + @JsonIgnore + private volatile Firehose firehose = null; + + @JsonIgnore + private volatile boolean stopped = false; + @JsonIgnore private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null; @@ -285,8 +296,6 @@ public String getVersion(final Interval interval) this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics()); - // Delay firehose connection to avoid claiming input resources while the plumber is starting up. - Firehose firehose = null; Supplier committerSupplier = null; try { @@ -295,12 +304,14 @@ public String getVersion(final Interval interval) // Set up metrics emission toolbox.getMonitorScheduler().addMonitor(metricsMonitor); - // Set up firehose - firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser()); + // Delay firehose connection to avoid claiming input resources while the plumber is starting up. + final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory(); + final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory); + firehose = firehoseFactory.connect(spec.getDataSchema().getParser()); committerSupplier = Committers.supplierFromFirehose(firehose); // Time to read data! - while (firehose.hasMore()) { + while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) { final InputRow inputRow; try { @@ -337,8 +348,38 @@ public String getVersion(final Interval interval) finally { if (normalExit) { try { - plumber.persist(committerSupplier.get()); - plumber.finishJob(); + if (!stopped) { + // Hand off all pending data + log.info("Persisting and handing off pending data."); + plumber.persist(committerSupplier.get()); + plumber.finishJob(); + } else { + log.info("Persisting pending data without handoff, in preparation for restart."); + final Committer committer = committerSupplier.get(); + final CountDownLatch persistLatch = new CountDownLatch(1); + plumber.persist( + new Committer() + { + @Override + public Object getMetadata() + { + return committer.getMetadata(); + } + + @Override + public void run() + { + try { + committer.run(); + } + finally { + persistLatch.countDown(); + } + } + } + ); + persistLatch.await(); + } } catch (Exception e) { log.makeAlert(e, "Failed to finish realtime task").emit(); @@ -352,15 +393,67 @@ public String getVersion(final Interval interval) } } + log.info("Job done!"); return TaskStatus.success(getId()); } + @Override + public boolean canRestore() + { + return true; + } + + @Override + public void stopGracefully() + { + try { + synchronized (this) { + if (!stopped) { + stopped = true; + log.info("Gracefully stopping."); + if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) { + firehose.close(); + } else { + log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose); + } + } + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + /** + * Public for tests. + */ + @JsonIgnore + public Firehose getFirehose() + { + return firehose; + } + @JsonProperty("spec") public FireDepartment getRealtimeIngestionSchema() { return spec; } + /** + * Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than + * abruptly stopping. + *

+ * This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this. + */ + private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) + { + return firehoseFactory instanceof EventReceiverFirehoseFactory + || (firehoseFactory instanceof TimedShutoffFirehoseFactory + && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).getDelegateFactory())) + || (firehoseFactory instanceof ClippedFirehoseFactory + && isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate())); + } + public static class TaskActionSegmentPublisher implements SegmentPublisher { final Task task; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 6ac9f66facf6..7d871a57d9f9 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -62,6 +62,7 @@ public interface Task { /** * Returns ID of this task. Must be unique across all tasks ever created. + * * @return task ID */ public String getId(); @@ -69,6 +70,7 @@ public interface Task /** * Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks, * a common convention is to set group ID equal to task ID. + * * @return task group ID */ public String getGroupId(); @@ -76,12 +78,14 @@ public interface Task /** * Returns a {@link io.druid.indexing.common.task.TaskResource} for this task. Task resources define specific * worker requirements a task may require. + * * @return {@link io.druid.indexing.common.task.TaskResource} for this task */ public TaskResource getTaskResource(); /** * Returns a descriptive label for this task type. Used for metrics emission and logging. + * * @return task type label */ public String getType(); @@ -90,7 +94,7 @@ public interface Task * Get the nodeType for if/when this task publishes on zookeeper. * * @return the nodeType to use when publishing the server to zookeeper. null if the task doesn't expect to - * publish to zookeeper. + * publish to zookeeper. */ public String getNodeType(); @@ -102,7 +106,9 @@ public interface Task /** * Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method * should return null. + * * @param query result type + * * @return query runners for this task */ public QueryRunner getQueryRunner(Query query); @@ -117,7 +123,7 @@ public interface Task * Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The * actions must be idempotent, since this method may be executed multiple times. This typically runs on the * coordinator. If this method throws an exception, the task should be considered a failure. - * + *

* This method must be idempotent, as it may be run multiple times per task. * * @param taskActionClient action client for this task (not the full toolbox) @@ -128,6 +134,20 @@ public interface Task */ public boolean isReady(TaskActionClient taskActionClient) throws Exception; + /** + * Returns whether or not this task can restore its progress from its on-disk working directory. Restorable tasks + * may be started with a non-empty working directory. Tasks that exit uncleanly may still have a chance to attempt + * restores, meaning that restorable tasks should be able to deal with potentially partially written on-disk state. + */ + public boolean canRestore(); + + /** + * Asks a task to arrange for its "run" method to exit promptly. This method will only be called if + * {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with + * extreme prejudice. + */ + public void stopGracefully(); + /** * Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while * holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the task diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java index 801eb8cc7003..4a06bcaa1d34 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ForkingTaskRunner.java @@ -17,6 +17,8 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.CharMatcher; import com.google.common.base.Joiner; @@ -28,9 +30,11 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.io.ByteSink; import com.google.common.io.ByteSource; import com.google.common.io.ByteStreams; import com.google.common.io.Closer; +import com.google.common.io.FileWriteMode; import com.google.common.io.Files; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -40,6 +44,7 @@ import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; import io.druid.guice.annotations.Self; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.config.TaskConfig; @@ -51,6 +56,8 @@ import io.druid.tasklogs.TaskLogPusher; import io.druid.tasklogs.TaskLogStreamer; import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; +import org.joda.time.Interval; import java.io.File; import java.io.IOException; @@ -64,6 +71,7 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Runs tasks in separate processes using the "internal peon" verb. @@ -72,6 +80,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer { private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class); private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property."; + private static final String TASK_RESTORE_FILENAME = "restore.json"; private final ForkingTaskRunnerConfig config; private final TaskConfig taskConfig; private final Properties props; @@ -83,6 +92,8 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer private final Map tasks = Maps.newHashMap(); + private volatile boolean stopping = false; + @Inject public ForkingTaskRunner( ForkingTaskRunnerConfig config, @@ -102,7 +113,51 @@ public ForkingTaskRunner( this.node = node; this.portFinder = new PortFinder(config.getStartPort()); - this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(workerConfig.getCapacity())); + this.exec = MoreExecutors.listeningDecorator( + Execs.multiThreaded(workerConfig.getCapacity(), "forking-task-runner-%d") + ); + } + + @Override + public List>> restore() + { + final File restoreFile = getRestoreFile(); + final TaskRestoreInfo taskRestoreInfo; + if (restoreFile.exists()) { + try { + taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class); + } + catch (Exception e) { + log.error(e, "Failed to read restorable tasks from file[%s]. Skipping restore.", restoreFile); + return ImmutableList.of(); + } + } else { + return ImmutableList.of(); + } + + final List>> retVal = Lists.newArrayList(); + for (final String taskId : taskRestoreInfo.getRunningTasks()) { + try { + final File taskFile = new File(taskConfig.getTaskDir(taskId), "task.json"); + final Task task = jsonMapper.readValue(taskFile, Task.class); + + if (!task.getId().equals(taskId)) { + throw new ISE("WTF?! Task[%s] restore file had wrong id[%s].", taskId, task.getId()); + } + + if (task.canRestore()) { + log.info("Restoring task[%s].", task.getId()); + retVal.add(Pair.of(task, run(task))); + } + } + catch (Exception e) { + log.warn(e, "Failed to restore task[%s]. Trying to restore other tasks.", taskId); + } + } + + log.info("Restored %,d tasks.", retVal.size()); + + return retVal; } @Override @@ -113,7 +168,7 @@ public ListenableFuture run(final Task task) tasks.put( task.getId(), new ForkingTaskRunnerWorkItem( - task.getId(), + task, exec.submit( new Callable() { @@ -121,7 +176,7 @@ public ListenableFuture run(final Task task) public TaskStatus call() { final String attemptUUID = UUID.randomUUID().toString(); - final File taskDir = new File(taskConfig.getBaseTaskDir(), task.getId()); + final File taskDir = taskConfig.getTaskDir(task.getId()); final File attemptDir = new File(taskDir, attemptUUID); final ProcessHolder processHolder; @@ -144,9 +199,9 @@ public TaskStatus call() throw new IOException(String.format("Could not create directories: %s", attemptDir)); } - final File taskFile = new File(attemptDir, "task.json"); + final File taskFile = new File(taskDir, "task.json"); final File statusFile = new File(attemptDir, "status.json"); - final File logFile = new File(attemptDir, "log"); + final File logFile = new File(taskDir, "log"); // time to adjust process holders synchronized (tasks) { @@ -245,12 +300,18 @@ public TaskStatus call() command.add(String.format("-Ddruid.host=%s", childHost)); command.add(String.format("-Ddruid.port=%d", childPort)); - if(config.isSeparateIngestionEndpoint()) { - command.add(String.format("-Ddruid.indexer.task.chathandler.service=%s", "placeholder/serviceName")); + if (config.isSeparateIngestionEndpoint()) { + command.add(String.format( + "-Ddruid.indexer.task.chathandler.service=%s", + "placeholder/serviceName" + )); // Actual serviceName will be passed by the EventReceiverFirehose when it registers itself with ChatHandlerProvider // Thus, "placeholder/serviceName" will be ignored command.add(String.format("-Ddruid.indexer.task.chathandler.host=%s", childHost)); - command.add(String.format("-Ddruid.indexer.task.chathandler.port=%d", childChatHandlerPort)); + command.add(String.format( + "-Ddruid.indexer.task.chathandler.port=%d", + childChatHandlerPort + )); } command.add("io.druid.cli.Main"); @@ -264,7 +325,9 @@ public TaskStatus call() command.add(nodeType); } - jsonMapper.writeValue(taskFile, task); + if (!taskFile.exists()) { + jsonMapper.writeValue(taskFile, task); + } log.info("Running command: %s", Joiner.on(" ").join(command)); taskWorkItem.processHolder = new ProcessHolder( @@ -280,7 +343,8 @@ public TaskStatus call() log.info("Logging task %s output to: %s", task.getId(), logFile); boolean runFailed = true; - try (final OutputStream toLogfile = Files.asByteSink(logFile).openStream()) { + final ByteSink logSink = Files.asByteSink(logFile, FileWriteMode.APPEND); + try (final OutputStream toLogfile = logSink.openStream()) { ByteStreams.copy(processHolder.process.getInputStream(), toLogfile); final int statusCode = processHolder.process.waitFor(); log.info("Process exited with status[%d] for task: %s", statusCode, task.getId()); @@ -319,13 +383,27 @@ public TaskStatus call() if (taskWorkItem != null && taskWorkItem.processHolder != null) { taskWorkItem.processHolder.process.destroy(); } + if (!stopping) { + saveRunningTasks(); + } } - portFinder.markPortUnused(childPort); - if(childChatHandlerPort > 0) { + + if (childChatHandlerPort > 0) { portFinder.markPortUnused(childChatHandlerPort); } - log.info("Removing temporary directory: %s", attemptDir); - FileUtils.deleteDirectory(attemptDir); + + try { + if (!stopping && taskDir.exists()) { + log.info("Removing task directory: %s", taskDir); + FileUtils.deleteDirectory(taskDir); + } + } + catch (Exception e) { + log.makeAlert(e, "Failed to delete task directory") + .addData("taskDir", taskDir.toString()) + .addData("task", task.getId()) + .emit(); + } } catch (Exception e) { log.error(e, "Suppressing exception caught while cleaning up task"); @@ -337,7 +415,7 @@ public TaskStatus call() ) ); } - + saveRunningTasks(); return tasks.get(task.getId()).getResult(); } } @@ -345,16 +423,41 @@ public TaskStatus call() @LifecycleStop public void stop() { - synchronized (tasks) { - exec.shutdown(); + stopping = true; + exec.shutdown(); + synchronized (tasks) { for (ForkingTaskRunnerWorkItem taskWorkItem : tasks.values()) { if (taskWorkItem.processHolder != null) { - log.info("Destroying process: %s", taskWorkItem.processHolder.process); - taskWorkItem.processHolder.process.destroy(); + log.info("Closing output stream to task[%s].", taskWorkItem.getTask().getId()); + try { + taskWorkItem.processHolder.process.getOutputStream().close(); + } + catch (Exception e) { + log.warn(e, "Failed to close stdout to task[%s]. Destroying task.", taskWorkItem.getTask().getId()); + taskWorkItem.processHolder.process.destroy(); + } } } } + + final DateTime start = new DateTime(); + final long timeout = new Interval(start, taskConfig.getGracefulShutdownTimeout()).toDurationMillis(); + + // Things should be terminating now. Wait for it to happen so logs can be uploaded and all that good stuff. + log.info("Waiting %,dms for shutdown.", timeout); + if (timeout > 0) { + try { + exec.awaitTermination(timeout, TimeUnit.MILLISECONDS); + log.info("Finished stopping in %,dms.", System.currentTimeMillis() - start.getMillis()); + } + catch (InterruptedException e) { + log.warn(e, "Interrupted while waiting for executor to finish."); + Thread.currentThread().interrupt(); + } + } else { + log.warn("Ran out of time, not waiting for executor to finish!"); + } } @Override @@ -448,17 +551,68 @@ public InputStream openStream() throws IOException ); } + // Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that + // occur while saving. + private void saveRunningTasks() + { + final File restoreFile = getRestoreFile(); + final List theTasks = Lists.newArrayList(); + for (ForkingTaskRunnerWorkItem forkingTaskRunnerWorkItem : tasks.values()) { + theTasks.add(forkingTaskRunnerWorkItem.getTaskId()); + } + + try { + Files.createParentDirs(restoreFile); + jsonMapper.writeValue(restoreFile, new TaskRestoreInfo(theTasks)); + } + catch (Exception e) { + log.warn(e, "Failed to save tasks to restore file[%s]. Skipping this save.", restoreFile); + } + } + + private File getRestoreFile() + { + return new File(taskConfig.getBaseTaskDir(), TASK_RESTORE_FILENAME); + } + + private static class TaskRestoreInfo + { + @JsonProperty + private final List runningTasks; + + @JsonCreator + public TaskRestoreInfo( + @JsonProperty("runningTasks") List runningTasks + ) + { + this.runningTasks = runningTasks; + } + + public List getRunningTasks() + { + return runningTasks; + } + } + private static class ForkingTaskRunnerWorkItem extends TaskRunnerWorkItem { + private final Task task; + private volatile boolean shutdown = false; private volatile ProcessHolder processHolder = null; private ForkingTaskRunnerWorkItem( - String taskId, + Task task, ListenableFuture statusFuture ) { - super(taskId, statusFuture); + super(task.getId(), statusFuture); + this.task = task; + } + + public Task getTask() + { + return task; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java index 8c32c22a5e6b..717e3efdca5f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/RemoteTaskRunner.java @@ -40,6 +40,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.RE; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -305,6 +306,12 @@ public void stop() } } + @Override + public List>> restore() + { + return ImmutableList.of(); + } + @Override public Collection getWorkers() { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java index 594c46d57161..37eb6804d2d4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskQueue.java @@ -213,6 +213,9 @@ private void manage() throws InterruptedException log.info("Beginning management in %s.", config.getStartDelay()); Thread.sleep(config.getStartDelay().getMillis()); + // Ignore return value- we'll get the IDs and futures from getKnownTasks later. + taskRunner.restore(); + while (active) { giant.lock(); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java index ad5d2e2dd8b8..b5841d6836b1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunner.java @@ -18,16 +18,24 @@ package io.druid.indexing.overlord; import com.google.common.util.concurrent.ListenableFuture; +import com.metamx.common.Pair; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.task.Task; import java.util.Collection; +import java.util.List; /** * Interface for handing off tasks. Managed by a {@link io.druid.indexing.overlord.TaskQueue}. */ public interface TaskRunner { + /** + * Some task runners can restart previously-running tasks after being bounced. This method does that, and returns + * the list of tasks (and status futures). + */ + public List>> restore(); + /** * Run a task. The returned status should be some kind of completed status. * @@ -45,6 +53,12 @@ public interface TaskRunner */ public void shutdown(String taskid); + /** + * Stop this task runner. This may block until currently-running tasks can be gracefully stopped. After calling + * stopping, "run" will not accept further tasks. + */ + public void stop(); + public Collection getRunningTasks(); public Collection getPendingTasks(); diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index fd2eb37830bb..a09cd1e87b98 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -28,53 +28,121 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; +import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.service.AlertEvent; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.concurrent.Execs; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.query.NoopQueryRunner; import io.druid.query.Query; import io.druid.query.QueryRunner; -import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QuerySegmentWalker; import io.druid.query.SegmentDescriptor; -import org.apache.commons.io.FileUtils; +import org.joda.time.DateTime; import org.joda.time.Interval; -import java.io.File; import java.util.Collection; +import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.TimeUnit; /** * Runs tasks in a JVM thread using an ExecutorService. */ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker { + private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class); + private final TaskToolboxFactory toolboxFactory; + private final TaskConfig taskConfig; private final ListeningExecutorService exec; private final Set runningItems = new ConcurrentSkipListSet<>(); - private final QueryRunnerFactoryConglomerate conglomerate; - private static final EmittingLogger log = new EmittingLogger(ThreadPoolTaskRunner.class); + private final ServiceEmitter emitter; @Inject public ThreadPoolTaskRunner( TaskToolboxFactory toolboxFactory, - QueryRunnerFactoryConglomerate conglomerate + TaskConfig taskConfig, + ServiceEmitter emitter ) { this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory"); + this.taskConfig = taskConfig; this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d")); - this.conglomerate = conglomerate; + this.emitter = Preconditions.checkNotNull(emitter, "emitter"); + } + + @Override + public List>> restore() + { + return ImmutableList.of(); } @LifecycleStop public void stop() { + exec.shutdown(); + + for (ThreadPoolTaskRunnerWorkItem item : runningItems) { + final Task task = item.getTask(); + final long start = System.currentTimeMillis(); + final boolean graceful; + final long elapsed; + boolean error = false; + + if (task.canRestore()) { + // Attempt graceful shutdown. + graceful = true; + log.info("Starting graceful shutdown of task[%s].", task.getId()); + + try { + task.stopGracefully(); + final TaskStatus taskStatus = item.getResult().get( + new Interval(new DateTime(start), taskConfig.getGracefulShutdownTimeout()).toDurationMillis(), + TimeUnit.MILLISECONDS + ); + log.info( + "Graceful shutdown of task[%s] finished in %,dms with status[%s].", + task.getId(), + System.currentTimeMillis() - start, + taskStatus.getStatusCode() + ); + } + catch (Exception e) { + log.makeAlert(e, "Graceful task shutdown failed: %s", task.getDataSource()) + .addData("taskId", task.getId()) + .addData("dataSource", task.getDataSource()) + .emit(); + log.warn(e, "Graceful shutdown of task[%s] aborted with exception."); + error = true; + } + } else { + graceful = false; + } + + elapsed = System.currentTimeMillis() - start; + + final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent + .builder() + .setDimension("task", task.getId()) + .setDimension("dataSource", task.getDataSource()) + .setDimension("graceful", String.valueOf(graceful)) + .setDimension("error", String.valueOf(error)); + + emitter.emit(metricBuilder.build("task/interrupt/count", 1L)); + emitter.emit(metricBuilder.build("task/interrupt/elapsed", elapsed)); + } + + // Ok, now interrupt everything. exec.shutdownNow(); } @@ -210,7 +278,6 @@ public ThreadPoolTaskRunnerCallable(Task task, TaskToolbox toolbox) public TaskStatus call() { final long startTime = System.currentTimeMillis(); - final File taskDir = toolbox.getTaskWorkDir(); TaskStatus status; @@ -231,19 +298,6 @@ public TaskStatus call() throw Throwables.propagate(t); } - try { - if (taskDir.exists()) { - log.info("Removing task directory: %s", taskDir); - FileUtils.deleteDirectory(taskDir); - } - } - catch (Exception e) { - log.makeAlert(e, "Failed to delete task directory") - .addData("taskDir", taskDir.toString()) - .addData("task", task.getId()) - .emit(); - } - try { return status.withDuration(System.currentTimeMillis() - startTime); } diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java index 1ad9b5914d51..5dc96b3af6ff 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/WorkerTaskMonitor.java @@ -18,7 +18,10 @@ package io.druid.indexing.worker; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Sets; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; +import com.metamx.common.Pair; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.emitter.EmittingLogger; @@ -28,18 +31,20 @@ import io.druid.indexing.overlord.TaskRunner; import io.druid.indexing.worker.config.WorkerConfig; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.listen.Listenable; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; /** * The monitor watches ZK at a specified path for new tasks to appear. Upon starting the monitor, a listener will be * created that waits for new tasks. Tasks are executed as soon as they are seen. - * + *

* The monitor implements {@link io.druid.query.QuerySegmentWalker} so tasks can offer up queryable data. This is useful for * realtime index tasks. */ @@ -84,14 +89,20 @@ public WorkerTaskMonitor( public void start() { try { + // restore restorable tasks + final List>> restored = taskRunner.restore(); + for (Pair> pair : restored) { + submitTaskRunnable(pair.lhs, pair.rhs); + } + // cleanup any old running task announcements which are invalid after restart - for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()){ - if(announcement.getTaskStatus().isRunnable()) { + for (TaskAnnouncement announcement : workerCuratorCoordinator.getAnnouncements()) { + if (!isTaskRunning(announcement.getTaskStatus().getId()) && announcement.getTaskStatus().isRunnable()) { workerCuratorCoordinator.updateAnnouncement( TaskAnnouncement.create( - announcement.getTaskId(), + announcement.getTaskStatus().getId(), announcement.getTaskResource(), - TaskStatus.failure(announcement.getTaskId()) + TaskStatus.failure(announcement.getTaskStatus().getId()) ) ); } @@ -110,67 +121,7 @@ public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent Task.class ); - if (isTaskRunning(task)) { - log.warn( - "I can't build it. There's something in the way. Got task %s that I am already running...", - task.getId() - ); - workerCuratorCoordinator.unannounceTask(task.getId()); - return; - } - - log.info("Submitting runnable for task[%s]", task.getId()); - - exec.submit( - new Runnable() - { - @Override - public void run() - { - final long startTime = System.currentTimeMillis(); - - log.info("Affirmative. Running task [%s]", task.getId()); - running.add(task); - - TaskStatus taskStatus; - try { - workerCuratorCoordinator.unannounceTask(task.getId()); - workerCuratorCoordinator.announceTaskAnnouncement( - TaskAnnouncement.create( - task, - TaskStatus.running(task.getId()) - ) - ); - taskStatus = taskRunner.run(task).get(); - } - catch (Exception e) { - log.makeAlert(e, "I can't build there. Failed to run task") - .addData("task", task.getId()) - .emit(); - taskStatus = TaskStatus.failure(task.getId()); - } - finally { - running.remove(task); - } - - taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime); - - try { - workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus)); - log.info( - "Job's finished. Completed [%s] with status [%s]", - task.getId(), - taskStatus.getStatusCode() - ); - } - catch (Exception e) { - log.makeAlert(e, "Failed to update task status") - .addData("task", task.getId()) - .emit(); - } - } - } - ); + submitTaskRunnable(task, null); } } } @@ -184,10 +135,86 @@ public void run() } } - private boolean isTaskRunning(final Task task) + private void submitTaskRunnable(final Task task, final ListenableFuture taskStatusAlreadySubmitted) + { + if (isTaskRunning(task.getId())) { + log.warn( + "I can't build it. There's something in the way. Got task %s that I am already running...", + task.getId() + ); + workerCuratorCoordinator.unannounceTask(task.getId()); + return; + } + + log.info("Submitting runnable for task[%s]", task.getId()); + + running.add(task); + + exec.submit( + new Runnable() + { + @Override + public void run() + { + final long startTime = System.currentTimeMillis(); + + TaskStatus taskStatus; + + try { + workerCuratorCoordinator.updateAnnouncement( + TaskAnnouncement.create( + task, + TaskStatus.running(task.getId()) + ) + ); + + if (taskStatusAlreadySubmitted != null) { + log.info("Affirmative. Connecting to already-running task [%s]", task.getId()); + taskStatus = taskStatusAlreadySubmitted.get(); + } else { + log.info("Affirmative. Running task [%s]", task.getId()); + workerCuratorCoordinator.unannounceTask(task.getId()); + taskStatus = taskRunner.run(task).get(); + } + } + catch (InterruptedException e) { + log.debug(e, "Interrupted while running task[%s], exiting.", task.getId()); + return; + } + catch (Exception e) { + log.makeAlert(e, "I can't build there. Failed to run task") + .addData("task", task.getId()) + .emit(); + taskStatus = TaskStatus.failure(task.getId()); + } + finally { + running.remove(task); + } + + taskStatus = taskStatus.withDuration(System.currentTimeMillis() - startTime); + + try { + workerCuratorCoordinator.updateAnnouncement(TaskAnnouncement.create(task, taskStatus)); + log.info( + "Job's finished. Completed [%s] with status [%s]", + task.getId(), + taskStatus.getStatusCode() + ); + } + catch (Exception e) { + log.makeAlert(e, "Failed to update task status") + .addData("task", task.getId()) + .emit(); + } + } + } + ); + } + + private boolean isTaskRunning(final String taskId) { for (final Task runningTask : running) { - if (runningTask.getId().equals(task.getId())) { + if (runningTask.getId().equals(taskId)) { return true; } } @@ -200,7 +227,8 @@ public void stop() { try { pathChildrenCache.close(); - exec.shutdown(); + exec.shutdownNow(); + taskRunner.stop(); } catch (Exception e) { log.makeAlert(e, "Exception stopping WorkerTaskMonitor") diff --git a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java index 2850293231ed..f3909a0bc61d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/io/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -31,12 +31,17 @@ import io.druid.concurrent.Execs; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.actions.TaskActionClientFactory; +import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.TaskRunner; +import org.joda.time.DateTime; import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.StandardOpenOption; import java.util.concurrent.ExecutorService; /** @@ -47,37 +52,41 @@ public class ExecutorLifecycle { private static final EmittingLogger log = new EmittingLogger(ExecutorLifecycle.class); - private final ExecutorLifecycleConfig config; + private final ExecutorLifecycleConfig taskExecutorConfig; + private final TaskConfig taskConfig; private final TaskActionClientFactory taskActionClientFactory; private final TaskRunner taskRunner; private final ObjectMapper jsonMapper; private final ExecutorService parentMonitorExec = Execs.singleThreaded("parent-monitor-%d"); + private volatile Task task = null; private volatile ListenableFuture statusFuture = null; + private volatile FileChannel taskLockChannel; + private volatile FileLock taskLockFileLock; @Inject public ExecutorLifecycle( - ExecutorLifecycleConfig config, + ExecutorLifecycleConfig taskExecutorConfig, + TaskConfig taskConfig, TaskActionClientFactory taskActionClientFactory, TaskRunner taskRunner, ObjectMapper jsonMapper ) { - this.config = config; + this.taskExecutorConfig = taskExecutorConfig; + this.taskConfig = taskConfig; this.taskActionClientFactory = taskActionClientFactory; this.taskRunner = taskRunner; this.jsonMapper = jsonMapper; } @LifecycleStart - public void start() + public void start() throws InterruptedException { - final File taskFile = Preconditions.checkNotNull(config.getTaskFile(), "taskFile"); - final File statusFile = Preconditions.checkNotNull(config.getStatusFile(), "statusFile"); - final InputStream parentStream = Preconditions.checkNotNull(config.getParentStream(), "parentStream"); - - final Task task; + final File taskFile = Preconditions.checkNotNull(taskExecutorConfig.getTaskFile(), "taskFile"); + final File statusFile = Preconditions.checkNotNull(taskExecutorConfig.getStatusFile(), "statusFile"); + final InputStream parentStream = Preconditions.checkNotNull(taskExecutorConfig.getParentStream(), "parentStream"); try { task = jsonMapper.readValue(taskFile, Task.class); @@ -91,6 +100,43 @@ public void start() throw Throwables.propagate(e); } + // Avoid running the same task twice on the same machine by locking the task base directory. + + final File taskLockFile = taskConfig.getTaskLockFile(task.getId()); + + try { + synchronized (this) { + if (taskLockChannel == null && taskLockFileLock == null) { + taskLockChannel = FileChannel.open( + taskLockFile.toPath(), + StandardOpenOption.CREATE, + StandardOpenOption.WRITE + ); + + log.info("Attempting to lock file[%s].", taskLockFile); + final long startLocking = System.currentTimeMillis(); + final long timeout = new DateTime(startLocking).plus(taskConfig.getDirectoryLockTimeout()).getMillis(); + while (taskLockFileLock == null && System.currentTimeMillis() < timeout) { + taskLockFileLock = taskLockChannel.tryLock(); + if (taskLockFileLock == null) { + Thread.sleep(100); + } + } + + if (taskLockFileLock == null) { + throw new ISE("Could not acquire lock file[%s] within %,dms.", taskLockFile, timeout - startLocking); + } else { + log.info("Acquired lock file[%s] in %,dms.", taskLockFile, System.currentTimeMillis() - startLocking); + } + } else { + throw new ISE("Already started!"); + } + } + } + catch (IOException e) { + throw Throwables.propagate(e); + } + // Spawn monitor thread to keep a watch on parent's stdin // If stdin reaches eof, the parent is gone, and we should shut down parentMonitorExec.submit( @@ -120,7 +166,8 @@ public void run() if (!task.isReady(taskActionClientFactory.create(task))) { throw new ISE("Task is not ready to run yet!", task.getId()); } - } catch (Exception e) { + } + catch (Exception e) { throw new ISE(e, "Failed to run isReady", task.getId()); } @@ -164,8 +211,18 @@ public void join() } @LifecycleStop - public void stop() + public void stop() throws Exception { parentMonitorExec.shutdown(); + + synchronized (this) { + if (taskLockFileLock != null) { + taskLockFileLock.release(); + } + + if (taskLockChannel != null) { + taskLockChannel.close(); + } + } } } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index ac8475a993b3..54fbddc16422 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -89,7 +89,7 @@ public void setUp() throws IOException EasyMock.replay(task); taskToolbox = new TaskToolboxFactory( - new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null), + new TaskConfig(temporaryFolder.newFile().toString(), null, null, 50000, null, null, null), mockTaskActionClientFactory, mockEmitter, mockSegmentPusher, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 67e0f81b8530..0c61311d8b58 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -19,12 +19,138 @@ package io.druid.indexing.common.task; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.util.Charsets; +import com.google.api.client.util.Sets; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.Granularity; +import com.metamx.common.ISE; +import com.metamx.common.guava.Sequences; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.LoggingEmitter; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.metrics.MonitorScheduler; +import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.MapCache; +import io.druid.concurrent.Execs; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.granularity.QueryGranularity; +import io.druid.indexing.common.SegmentLoaderFactory; +import io.druid.indexing.common.TaskStatus; +import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.TaskToolboxFactory; +import io.druid.indexing.common.TestUtils; +import io.druid.indexing.common.actions.LocalTaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionClientFactory; +import io.druid.indexing.common.actions.TaskActionToolbox; +import io.druid.indexing.common.config.TaskConfig; +import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.overlord.HeapMemoryTaskStorage; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.indexing.overlord.TaskLockbox; +import io.druid.indexing.overlord.TaskStorage; +import io.druid.indexing.test.TestDataSegmentAnnouncer; +import io.druid.indexing.test.TestDataSegmentKiller; +import io.druid.indexing.test.TestDataSegmentPusher; +import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; +import io.druid.indexing.test.TestServerView; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.DefaultQueryRunnerFactoryConglomerate; +import io.druid.query.Druids; +import io.druid.query.IntervalChunkingQueryRunnerDecorator; +import io.druid.query.Query; +import io.druid.query.QueryRunner; +import io.druid.query.QueryRunnerFactory; +import io.druid.query.QueryRunnerFactoryConglomerate; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryWatcher; +import io.druid.query.Result; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.query.timeseries.TimeseriesQuery; +import io.druid.query.timeseries.TimeseriesQueryEngine; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import io.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import io.druid.query.timeseries.TimeseriesResultValue; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeIOConfig; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.loading.SegmentLoaderConfig; +import io.druid.segment.loading.SegmentLoaderLocalCacheManager; +import io.druid.segment.loading.StorageLocationConfig; +import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; +import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.metrics.EventReceiverFirehoseRegister; +import io.druid.timeline.DataSegment; +import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.Period; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; public class RealtimeIndexTaskTest { + private static final Logger log = new Logger(RealtimeIndexTaskTest.class); + private static final DruidServerMetadata dummyServer = new DruidServerMetadata( + "dummy", + "dummy_host", + 0, + "historical", + "dummy_tier", + 0 + ); + private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); + private static final ServiceEmitter emitter = new ServiceEmitter( + "service", + "host", + new LoggingEmitter( + log, + LoggingEmitter.Level.ERROR, + jsonMapper + ) + ); + + @Rule + public final TemporaryFolder tempFolder = new TemporaryFolder(); + + private DateTime now; + private ListeningExecutorService taskExec; + + @Before + public void setUp() + { + EmittingLogger.registerEmitter(emitter); + emitter.start(); + taskExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("realtime-index-task-test-%d")); + now = new DateTime(); + } + + @After + public void tearDown() + { + taskExec.shutdownNow(); + } @Test public void testMakeTaskId() throws Exception @@ -34,4 +160,386 @@ public void testMakeTaskId() throws Exception RealtimeIndexTask.makeTaskId("test", 0, new DateTime("2015-01-02"), 0x76543210) ); } + + @Test(timeout = 10000L) + public void testBasics() throws Exception + { + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final RealtimeIndexTask task = makeRealtimeTask(null); + final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final ListenableFuture statusFuture = runTask(task, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task.getFirehose() == null) { + Thread.sleep(50); + } + + final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = + (EventReceiverFirehoseFactory.EventReceiverFirehose) task.getFirehose(); + + firehose.addRows( + ImmutableList.of( + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo") + ), + new MapBasedInputRow( + now, + ImmutableList.of("dim2"), + ImmutableMap.of("dim2", "bar") + ) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + while (mdc.getPublished().isEmpty()) { + Thread.sleep(50); + } + + // Do a query. + Assert.assertEquals(2, countEvents(task)); + + // Simulate handoff. + for (DataSegment segment : mdc.getPublished()) { + ((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment); + } + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); + } + + @Test(timeout = 10000L) + public void testRestore() throws Exception + { + final File directory = tempFolder.newFolder(); + final RealtimeIndexTask task1 = makeRealtimeTask(null); + + // First run: + { + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); + final ListenableFuture statusFuture = runTask(task1, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task1.getFirehose() == null) { + Thread.sleep(50); + } + + final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = + (EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose(); + + firehose.addRows( + ImmutableList.of( + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo") + ) + ) + ); + + // Trigger graceful shutdown. + task1.stopGracefully(); + + // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); + + // Nothing should be published. + Assert.assertEquals(Sets.newHashSet(), mdc.getPublished()); + } + + // Second run: + { + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); + final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory); + final ListenableFuture statusFuture = runTask(task2, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task2.getFirehose() == null) { + Thread.sleep(50); + } + + // Do a query, at this point the previous data should be loaded. + Assert.assertEquals(1, countEvents(task2)); + + final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = + (EventReceiverFirehoseFactory.EventReceiverFirehose) task2.getFirehose(); + + firehose.addRows( + ImmutableList.of( + new MapBasedInputRow( + now, + ImmutableList.of("dim2"), + ImmutableMap.of("dim2", "bar") + ) + ) + ); + + // Stop the firehose, this will drain out existing events. + firehose.close(); + + // Wait for publish. + while (mdc.getPublished().isEmpty()) { + Thread.sleep(50); + } + + // Do a query. + Assert.assertEquals(2, countEvents(task2)); + + // Simulate handoff. + for (DataSegment segment : mdc.getPublished()) { + ((TestServerView) taskToolbox.getNewSegmentServerView()).segmentAdded(dummyServer, segment); + } + + // Wait for the task to finish. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); + } + } + + @Test(timeout = 10000L) + public void testRestoreCorruptData() throws Exception + { + final File directory = tempFolder.newFolder(); + final RealtimeIndexTask task1 = makeRealtimeTask(null); + + // First run: + { + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); + final ListenableFuture statusFuture = runTask(task1, taskToolbox); + + // Wait for firehose to show up, it starts off null. + while (task1.getFirehose() == null) { + Thread.sleep(50); + } + + final EventReceiverFirehoseFactory.EventReceiverFirehose firehose = + (EventReceiverFirehoseFactory.EventReceiverFirehose) task1.getFirehose(); + + firehose.addRows( + ImmutableList.of( + new MapBasedInputRow( + now, + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo") + ) + ) + ); + + // Trigger graceful shutdown. + task1.stopGracefully(); + + // Wait for the task to finish. The status doesn't really matter, but we'll check it anyway. + final TaskStatus taskStatus = statusFuture.get(); + Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); + + // Nothing should be published. + Assert.assertEquals(Sets.newHashSet(), mdc.getPublished()); + } + + // Corrupt the data: + final File smooshFile = new File( + String.format( + "%s/persistent/task/%s/work/persist/%s/%s_%s/0/00000.smoosh", + directory, + task1.getId(), + task1.getDataSource(), + Granularity.DAY.truncate(now), + Granularity.DAY.increment(Granularity.DAY.truncate(now)) + ) + ); + + Files.write(smooshFile.toPath(), "oops!".getBytes(Charsets.UTF_8)); + + // Second run: + { + final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); + final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory); + final ListenableFuture statusFuture = runTask(task2, taskToolbox); + + // Wait for the task to finish. + boolean caught = false; + try { + statusFuture.get(); + } + catch (Exception e) { + caught = true; + } + Assert.assertTrue("expected exception", caught); + } + } + + private ListenableFuture runTask(final Task task, final TaskToolbox toolbox) + { + return taskExec.submit( + new Callable() + { + @Override + public TaskStatus call() throws Exception + { + try { + if (task.isReady(toolbox.getTaskActionClient())) { + return task.run(toolbox); + } else { + throw new ISE("Task is not ready"); + } + } + catch (Exception e) { + log.warn(e, "Task failed"); + throw e; + } + } + } + ); + } + + private RealtimeIndexTask makeRealtimeTask(final String taskId) + { + ObjectMapper objectMapper = new DefaultObjectMapper(); + DataSchema dataSchema = new DataSchema( + "test_ds", + null, + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null), + objectMapper + ); + RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( + new EventReceiverFirehoseFactory( + "foo", + 100, + new NoopChatHandlerProvider(), + objectMapper, + null, + new EventReceiverFirehoseRegister() + ), + null, + null + ); + RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( + 1000, + new Period("P1Y"), + new Period("PT10M"), + null, + null, + null, + null, + null, + null + ); + return new RealtimeIndexTask( + taskId, + null, + new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig), + null + ); + } + + private TaskToolbox makeToolbox(final Task task, final IndexerMetadataStorageCoordinator mdc, final File directory) + { + final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); + final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, null, null); + final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); + final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( + taskLockbox, + mdc, + emitter + ); + final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( + taskStorage, + taskActionToolbox + ); + final QueryRunnerFactoryConglomerate conglomerate = new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap., QueryRunnerFactory>of( + TimeseriesQuery.class, + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest( + new IntervalChunkingQueryRunnerDecorator(null, null, null) + { + @Override + public QueryRunner decorate( + QueryRunner delegate, QueryToolChest> toolChest + ) + { + return delegate; + } + } + ), + new TimeseriesQueryEngine(), + new QueryWatcher() + { + @Override + public void registerQuery(Query query, ListenableFuture future) + { + // do nothing + } + } + ) + ) + ); + final TestUtils testUtils = new TestUtils(); + final TaskToolboxFactory toolboxFactory = new TaskToolboxFactory( + taskConfig, + taskActionClientFactory, + emitter, + new TestDataSegmentPusher(), + new TestDataSegmentKiller(), + null, // DataSegmentMover + null, // DataSegmentArchiver + new TestDataSegmentAnnouncer(), + new TestServerView(), + conglomerate, + MoreExecutors.sameThreadExecutor(), // queryExecutorService + EasyMock.createMock(MonitorScheduler.class), + new SegmentLoaderFactory( + new SegmentLoaderLocalCacheManager( + null, + new SegmentLoaderConfig() + { + @Override + public List getLocations() + { + return Lists.newArrayList(); + } + }, testUtils.getTestObjectMapper() + ) + ), + testUtils.getTestObjectMapper(), + testUtils.getTestIndexMerger(), + testUtils.getTestIndexIO(), + MapCache.create(1024), + new CacheConfig() + ); + + taskLockbox.add(task); + return toolboxFactory.build(task); + } + + public long countEvents(final Task task) throws Exception + { + // Do a query. + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("test_ds") + .aggregators( + ImmutableList.of( + new LongSumAggregatorFactory("rows", "rows") + ) + ).granularity(QueryGranularity.ALL) + .intervals("2000/3000") + .build(); + + ArrayList> results = Sequences.toList( + task.getQueryRunner(query).run(query, ImmutableMap.of()), + Lists.>newArrayList() + ); + return results.get(0).getValue().getLongMetric("rows"); + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index c53805e53f22..6d7b88b69ec1 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -192,7 +192,7 @@ public void deleteSegments(Set segments) ); final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null), + new TaskConfig(tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), tac, newMockEmitter(), new DataSegmentPusher() diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 2f9e9dbf6902..c2ee4a84e02d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -294,7 +294,7 @@ public RetType submit(TaskAction taskAction) throws IOExcepti } }; final TaskToolboxFactory taskToolboxFactory = new TaskToolboxFactory( - new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null), + new TaskConfig(testCase.tmpDir.getAbsolutePath(), null, null, 50000, null, null, null), new TaskActionClientFactory() { @Override diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 7071935fb6f8..c8c4d7f26fd2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -32,7 +32,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; -import com.google.common.collect.Sets; import com.metamx.common.Granularity; import com.metamx.common.ISE; import com.metamx.common.guava.Comparators; @@ -72,8 +71,8 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.config.TaskQueueConfig; +import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import io.druid.jackson.DefaultObjectMapper; -import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; import io.druid.metadata.SQLMetadataStorageActionHandlerFactory; import io.druid.metadata.TestDerbyConnector; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -183,6 +182,7 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private final String taskStorageType; + private ObjectMapper mapper; private TaskStorageQueryAdapter tsqa = null; private File tmpDir = null; @@ -190,7 +190,7 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private TaskLockbox tl = null; private TaskQueue tq = null; private TaskRunner tr = null; - private MockIndexerMetadataStorageCoordinator mdc = null; + private TestIndexerMetadataStorageCoordinator mdc = null; private TaskActionClientFactory tac = null; private TaskToolboxFactory tb = null; private IndexSpec indexSpec; @@ -205,9 +205,18 @@ public int compare(DataSegment dataSegment, DataSegment dataSegment2) private TestDerbyConnector testDerbyConnector; private List segmentCallbacks = new ArrayList<>(); - private static MockIndexerMetadataStorageCoordinator newMockMDC() + private static TestIndexerMetadataStorageCoordinator newMockMDC() { - return new MockIndexerMetadataStorageCoordinator(); + return new TestIndexerMetadataStorageCoordinator() + { + @Override + public Set announceHistoricalSegments(Set segments) + { + Set retVal = super.announceHistoricalSegments(segments); + publishCountDown.countDown(); + return retVal; + } + }; } private static ServiceEmitter newMockEmitter() @@ -373,7 +382,11 @@ public void setUp() throws Exception ts = new MetadataTaskStorage( testDerbyConnector, new TaskStorageConfig(null), - new SQLMetadataStorageActionHandlerFactory(testDerbyConnector, derbyConnectorRule.metadataTablesConfigSupplier().get(), mapper) + new SQLMetadataStorageActionHandlerFactory( + testDerbyConnector, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + mapper + ) ); } else { throw new RuntimeException(String.format("Unknown task storage type [%s]", taskStorageType)); @@ -408,13 +421,15 @@ public DataSegment push(File file, DataSegment segment) throws IOException ); } - private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) { + private void setUpAndStartTaskQueue(DataSegmentPusher dataSegmentPusher) + { + final TaskConfig taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, null, null); tsqa = new TaskStorageQueryAdapter(ts); tl = new TaskLockbox(ts); mdc = newMockMDC(); tac = new LocalTaskActionClientFactory(ts, new TaskActionToolbox(tl, mdc, newMockEmitter())); tb = new TaskToolboxFactory( - new TaskConfig(tmpDir.toString(), null, null, 50000, null), + taskConfig, tac, newMockEmitter(), dataSegmentPusher, @@ -491,7 +506,7 @@ public List getLocations() MapCache.create(0), FireDepartmentTest.NO_CACHE_CONFIG ); - tr = new ThreadPoolTaskRunner(tb, null); + tr = new ThreadPoolTaskRunner(tb, taskConfig, emitter); tq = new TaskQueue(tqc, ts, tr, tac, tl, emitter); tq.start(); } @@ -821,7 +836,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception Assert.assertEquals("segments nuked", 0, mdc.getNuked().size()); } - @Test (timeout = 4000L) + @Test(timeout = 4000L) public void testRealtimeIndexTask() throws Exception { monitorScheduler.addMonitor(EasyMock.anyObject(Monitor.class)); @@ -870,7 +885,7 @@ public void testRealtimeIndexTask() throws Exception EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate); } - @Test (timeout = 4000L) + @Test(timeout = 4000L) public void testRealtimeIndexTaskFailure() throws Exception { setUpAndStartTaskQueue( @@ -1012,7 +1027,8 @@ private TaskStatus runTask(final Task task) throws Exception return retVal; } - private RealtimeIndexTask giveMeARealtimeIndexTask() { + private RealtimeIndexTask giveMeARealtimeIndexTask() + { String taskId = String.format("rt_task_%s", System.currentTimeMillis()); DataSchema dataSchema = new DataSchema( "test_ds", @@ -1023,7 +1039,8 @@ private RealtimeIndexTask giveMeARealtimeIndexTask() { ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( new MockFirehoseFactory(true), - null, // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class + null, + // PlumberSchool - Realtime Index Task always uses RealtimePlumber which is hardcoded in RealtimeIndexTask class null ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( @@ -1045,64 +1062,4 @@ private RealtimeIndexTask giveMeARealtimeIndexTask() { null ); } - - private static class MockIndexerMetadataStorageCoordinator extends IndexerSQLMetadataStorageCoordinator - { - final private Set published = Sets.newHashSet(); - final private Set nuked = Sets.newHashSet(); - - private List unusedSegments; - - private MockIndexerMetadataStorageCoordinator() - { - super(null, null, null); - unusedSegments = Lists.newArrayList(); - } - - @Override - public List getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException - { - return ImmutableList.of(); - } - - @Override - public List getUnusedSegmentsForInterval(String dataSource, Interval interval) - { - return unusedSegments; - } - - @Override - public Set announceHistoricalSegments(Set segments) - { - Set added = Sets.newHashSet(); - for (final DataSegment segment : segments) { - if (published.add(segment)) { - added.add(segment); - } - } - TaskLifecycleTest.publishCountDown.countDown(); - return ImmutableSet.copyOf(added); - } - - @Override - public void deleteSegments(Set segments) - { - nuked.addAll(segments); - } - - public Set getPublished() - { - return ImmutableSet.copyOf(published); - } - - public Set getNuked() - { - return ImmutableSet.copyOf(nuked); - } - - public void setUnusedSegments(List unusedSegments) - { - this.unusedSegments = unusedSegments; - } - } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index ec20523fe7bd..a28c332491f5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.Pair; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.guava.CloseQuietly; import com.metamx.emitter.EmittingLogger; @@ -294,6 +295,18 @@ public MockTaskRunner(CountDownLatch[] runLatches, CountDownLatch[] completionLa this.runningTasks = new ArrayList<>(); } + @Override + public List>> restore() + { + return ImmutableList.of(); + } + + @Override + public void stop() + { + // Do nothing + } + @Override public synchronized ListenableFuture run(final Task task) { diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java new file mode 100644 index 000000000000..be5ef6df9ea2 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java @@ -0,0 +1,66 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.test; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.timeline.DataSegment; + +import java.io.IOException; +import java.util.Set; + +public class TestDataSegmentAnnouncer implements DataSegmentAnnouncer +{ + public Set announcedSegments = Sets.newConcurrentHashSet(); + + @Override + public void announceSegment(DataSegment segment) throws IOException + { + announcedSegments.add(segment); + } + + @Override + public void unannounceSegment(DataSegment segment) throws IOException + { + announcedSegments.remove(segment); + } + + @Override + public void announceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + announcedSegments.add(segment); + } + } + + @Override + public void unannounceSegments(Iterable segments) throws IOException + { + for (DataSegment segment : segments) { + announcedSegments.remove(segment); + } + } + + public Set getAnnouncedSegments() + { + return ImmutableSet.copyOf(announcedSegments); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentKiller.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentKiller.java new file mode 100644 index 000000000000..ab5971a0c07a --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentKiller.java @@ -0,0 +1,44 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.test; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import io.druid.segment.loading.DataSegmentKiller; +import io.druid.segment.loading.SegmentLoadingException; +import io.druid.timeline.DataSegment; + +import java.util.Set; + +public class TestDataSegmentKiller implements DataSegmentKiller +{ + private final Set killedSegments = Sets.newConcurrentHashSet(); + + @Override + public void kill(DataSegment segment) throws SegmentLoadingException + { + killedSegments.add(segment); + } + + public Set getKilledSegments() + { + return ImmutableSet.copyOf(killedSegments); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java new file mode 100644 index 000000000000..b9bda3b6884c --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentPusher.java @@ -0,0 +1,52 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.test; + +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; +import io.druid.segment.loading.DataSegmentPusher; +import io.druid.timeline.DataSegment; + +import java.io.File; +import java.io.IOException; +import java.util.Set; + +public class TestDataSegmentPusher implements DataSegmentPusher +{ + private final Set pushedSegments = Sets.newConcurrentHashSet(); + + @Override + public String getPathForHadoop(String dataSource) + { + throw new UnsupportedOperationException(); + } + + @Override + public DataSegment push(File file, DataSegment segment) throws IOException + { + pushedSegments.add(segment); + return segment; + } + + public Set getPushedSegments() + { + return ImmutableSet.copyOf(pushedSegments); + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java new file mode 100644 index 000000000000..5f5940abc899 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -0,0 +1,113 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import io.druid.timeline.DataSegment; +import org.joda.time.Interval; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator +{ + final private Set published = Sets.newConcurrentHashSet(); + final private Set nuked = Sets.newConcurrentHashSet(); + final private List unusedSegments; + + public TestIndexerMetadataStorageCoordinator() + { + unusedSegments = Lists.newArrayList(); + } + + @Override + public List getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException + { + return ImmutableList.of(); + } + + @Override + public List getUnusedSegmentsForInterval(String dataSource, Interval interval) + { + synchronized (unusedSegments) { + return ImmutableList.copyOf(unusedSegments); + } + } + + @Override + public Set announceHistoricalSegments(Set segments) + { + Set added = Sets.newHashSet(); + for (final DataSegment segment : segments) { + if (published.add(segment)) { + added.add(segment); + } + } + return ImmutableSet.copyOf(added); + } + + @Override + public SegmentIdentifier allocatePendingSegment( + String dataSource, + String sequenceName, + String previousSegmentId, + Interval interval, + String maxVersion + ) throws IOException + { + throw new UnsupportedOperationException(); + } + + @Override + public void deleteSegments(Set segments) + { + nuked.addAll(segments); + } + + @Override + public void updateSegmentMetadata(Set segments) throws IOException + { + throw new UnsupportedOperationException(); + } + + public Set getPublished() + { + return ImmutableSet.copyOf(published); + } + + public Set getNuked() + { + return ImmutableSet.copyOf(nuked); + } + + public void setUnusedSegments(List newUnusedSegments) + { + synchronized (unusedSegments) { + unusedSegments.clear(); + unusedSegments.addAll(newUnusedSegments); + } + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java new file mode 100644 index 000000000000..b9185547e096 --- /dev/null +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java @@ -0,0 +1,116 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.test; + +import com.google.api.client.util.Lists; +import com.google.common.base.Predicate; +import com.google.common.collect.Maps; +import com.metamx.common.Pair; +import io.druid.client.FilteredServerView; +import io.druid.client.ServerView; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.timeline.DataSegment; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; + +public class TestServerView implements FilteredServerView, ServerView.SegmentCallback +{ + final ConcurrentMap, Executor>> callbacks = Maps.newConcurrentMap(); + + @Override + public void registerSegmentCallback( + final Executor exec, + final ServerView.SegmentCallback callback, + final Predicate filter + ) + { + callbacks.put(callback, Pair.of(filter, exec)); + } + + @Override + public ServerView.CallbackAction segmentAdded( + final DruidServerMetadata server, + final DataSegment segment + ) + { + for (final Map.Entry, Executor>> entry : callbacks.entrySet()) { + if (entry.getValue().lhs.apply(segment)) { + entry.getValue().rhs.execute( + new Runnable() + { + @Override + public void run() + { + entry.getKey().segmentAdded(server, segment); + } + } + ); + } + } + + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentRemoved( + final DruidServerMetadata server, + final DataSegment segment + ) + { + for (final Map.Entry, Executor>> entry : callbacks.entrySet()) { + if (entry.getValue().lhs.apply(segment)) { + entry.getValue().rhs.execute( + new Runnable() + { + @Override + public void run() + { + entry.getKey().segmentRemoved(server, segment); + } + } + ); + } + } + + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction segmentViewInitialized() + { + for (final Map.Entry, Executor>> entry : callbacks.entrySet()) { + entry.getValue().rhs.execute( + new Runnable() + { + @Override + public void run() + { + entry.getKey().segmentViewInitialized(); + } + } + ); + } + + return ServerView.CallbackAction.CONTINUE; + } +} diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index aa691c757fc2..944b3d5625cb 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -41,6 +41,7 @@ import io.druid.segment.loading.StorageLocationConfig; import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; +import io.druid.server.metrics.NoopServiceEmitter; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -133,13 +134,14 @@ public String getBase() private WorkerTaskMonitor createTaskMonitor() { + final TaskConfig taskConfig = new TaskConfig(Files.createTempDir().toString(), null, null, 0, null, null, null); return new WorkerTaskMonitor( jsonMapper, cf, workerCuratorCoordinator, new ThreadPoolTaskRunner( new TaskToolboxFactory( - new TaskConfig(Files.createTempDir().toString(), null, null, 0, null), + taskConfig, null, null, null, null, null, null, null, null, null, null, null, new SegmentLoaderFactory( new SegmentLoaderLocalCacheManager( null, @@ -160,7 +162,8 @@ public List getLocations() null, null ), - null + taskConfig, + new NoopServiceEmitter() ), new WorkerConfig().setCapacity(1) ); diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java index 7d1934f47c68..7753019b45d3 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java @@ -177,17 +177,7 @@ public Response addAll( } try { - for (final InputRow row : rows) { - boolean added = false; - while (!closed && !added) { - added = buffer.offer(row, 500, TimeUnit.MILLISECONDS); - } - - if (!added) { - throw new IllegalStateException("Cannot add events to closed firehose!"); - } - } - + addRows(rows); return Response.ok( objectMapper.writeValueAsString(ImmutableMap.of("eventCount", events.size())), contentType @@ -267,11 +257,29 @@ public int getCapacity() @Override public void close() throws IOException { - log.info("Firehose closing."); - closed = true; - eventReceiverFirehoseRegister.unregister(serviceName); - if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().unregister(serviceName); + if (!closed) { + log.info("Firehose closing."); + closed = true; + + eventReceiverFirehoseRegister.unregister(serviceName); + if (chatHandlerProvider.isPresent()) { + chatHandlerProvider.get().unregister(serviceName); + } + } + } + + // public for tests + public void addRows(Iterable rows) throws InterruptedException + { + for (final InputRow row : rows) { + boolean added = false; + while (!closed && !added) { + added = buffer.offer(row, 500, TimeUnit.MILLISECONDS); + } + + if (!added) { + throw new IllegalStateException("Cannot add events to closed firehose!"); + } } } }