Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/configuration/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ Additional peon configs include:
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|

If `druid.indexer.runner.separateIngestionEndpoint` is set to true then following configurations are available for the ingestion server at peon:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public TaskToolboxFactory(

public TaskToolbox build(Task task)
{
final File taskWorkDir = new File(new File(config.getBaseTaskDir(), task.getId()), "work");
final File taskWorkDir = config.getTaskWorkDir(task.getId());

return new TaskToolbox(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.joda.time.Period;

import java.io.File;
import java.util.List;
Expand All @@ -30,6 +31,10 @@ public class TaskConfig
"org.apache.hadoop:hadoop-client:2.3.0"
);

private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");

private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");

@JsonProperty
private final String baseDir;

Expand All @@ -45,13 +50,21 @@ public class TaskConfig
@JsonProperty
private final List<String> defaultHadoopCoordinates;

@JsonProperty
private final Period gracefulShutdownTimeout;

@JsonProperty
private final Period directoryLockTimeout;

@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@JsonProperty("baseTaskDir") String baseTaskDir,
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout
)
{
this.baseDir = baseDir == null ? "/tmp" : baseDir;
Expand All @@ -61,6 +74,12 @@ public TaskConfig(
this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
? DEFAULT_DEFAULT_HADOOP_COORDINATES
: defaultHadoopCoordinates;
this.gracefulShutdownTimeout = gracefulShutdownTimeout == null
? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT
: gracefulShutdownTimeout;
this.directoryLockTimeout = directoryLockTimeout == null
? DEFAULT_DIRECTORY_LOCK_TIMEOUT
: directoryLockTimeout;
}

@JsonProperty
Expand All @@ -75,6 +94,21 @@ public File getBaseTaskDir()
return baseTaskDir;
}

public File getTaskDir(String taskId)
{
return new File(baseTaskDir, taskId);
}

public File getTaskWorkDir(String taskId)
{
return new File(getTaskDir(taskId), "work");
}

public File getTaskLockFile(String taskId)
{
return new File(getTaskDir(taskId), "lock");
}

@JsonProperty
public String getHadoopWorkingPath()
{
Expand All @@ -93,6 +127,18 @@ public List<String> getDefaultHadoopCoordinates()
return defaultHadoopCoordinates;
}

@JsonProperty
public Period getGracefulShutdownTimeout()
{
return gracefulShutdownTimeout;
}

@JsonProperty
public Period getDirectoryLockTimeout()
{
return directoryLockTimeout;
}

private String defaultDir(String configParameter, final String defaultVal)
{
if (configParameter == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ public String getClasspathPrefix()
return null;
}

@Override
public boolean canRestore()
{
return false;
}

@Override
public void stopGracefully()
{
// Should not be called when canRestore = false.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do u want to explicitly check for the condition and print this message when canRestore is false?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we can do anything useful here even if canRestore() returned true. any implementation has to override both of them together.

throw new UnsupportedOperationException("Cannot stop gracefully");
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
Expand All @@ -48,6 +49,9 @@
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
Expand All @@ -62,6 +66,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class RealtimeIndexTask extends AbstractTask
{
Expand Down Expand Up @@ -104,6 +109,12 @@ private static String makeDatasource(FireDepartment fireDepartment)
@JsonIgnore
private volatile Plumber plumber = null;

@JsonIgnore
private volatile Firehose firehose = null;

@JsonIgnore
private volatile boolean stopped = false;

@JsonIgnore
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;

Expand Down Expand Up @@ -285,8 +296,6 @@ public String getVersion(final Interval interval)

this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());

// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
Firehose firehose = null;
Supplier<Committer> committerSupplier = null;

try {
Expand All @@ -295,12 +304,14 @@ public String getVersion(final Interval interval)
// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);

// Set up firehose
firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory();
final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);
firehose = firehoseFactory.connect(spec.getDataSchema().getParser());
committerSupplier = Committers.supplierFromFirehose(firehose);

// Time to read data!
while (firehose.hasMore()) {
while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) {
final InputRow inputRow;

try {
Expand Down Expand Up @@ -337,8 +348,38 @@ public String getVersion(final Interval interval)
finally {
if (normalExit) {
try {
plumber.persist(committerSupplier.get());
plumber.finishJob();
if (!stopped) {
// Hand off all pending data
log.info("Persisting and handing off pending data.");
plumber.persist(committerSupplier.get());
plumber.finishJob();
} else {
log.info("Persisting pending data without handoff, in preparation for restart.");
final Committer committer = committerSupplier.get();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the firehose is already closed while doing shutdown.
Is it safe to call commit on a closed firehose ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this might not work for KafkaEightFirehoseFactory too since the underlying connector is already closed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nishantmonu51 that's a good point, this does not really work very well with the kafka firehose. But then again, RealtimeIndexTasks never did work well with kafka…

do you think it's worth it to rework things such that this works well for kafka and the event receiver? I think to do that we would want the behavior,

  • if you're using EventReceiverFirehose, stopGracefully causes the servlet to stop accepting new data, and the task will drain existing data, then stop.
  • if you're using the Kafka firehose, stopGracefully causes the task to simply stop reading data, then persist/commit, then stop.

This could probably be accomplished somehow…

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree things dont work very well with kafka firehose at present also, till now we also didn't had a concept for graceful restart, If we are going to provide that functionality, I think we should also look into how we can make it working with our current firehoses.
Also, to others in community who might have written their own custom firehoses, a call to commit after a firehose has been shutdown may be unexpected and might result in weird errors.

Both the behaviours for EventReceiverFirehose and KafkaFirehose seem good and points to that we may need to add an API to the firehose where instead of completely shutting it down, we ask firehose to stop reading any further events, ingest all events which might be in some buffers, persist and call commit on firehose, shutdown the firehose and release any resources being held up by the firehose .

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think some people do use kafka firehose (with either partition or replication) and it works in the specific cases, with this change that will break.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nishantmonu51 @himanshug thinking of doing this by just having special behavior triggered by an instanceof check for EventReceiverFirehose. There doesn't seem to be a nicer way to do it with the current firehose interface. Basically- the ERF would get closed and drained, all other firehoses we would simply immediately stop reading. For those we would rely on commit() being an effective way to get back undrained data.

final CountDownLatch persistLatch = new CountDownLatch(1);
plumber.persist(
new Committer()
{
@Override
public Object getMetadata()
{
return committer.getMetadata();
}

@Override
public void run()
{
try {
committer.run();
}
finally {
persistLatch.countDown();
}
}
}
);
persistLatch.await();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to finish realtime task").emit();
Expand All @@ -352,15 +393,67 @@ public String getVersion(final Interval interval)
}
}

log.info("Job done!");
return TaskStatus.success(getId());
}

@Override
public boolean canRestore()
{
return true;
}

@Override
public void stopGracefully()
{
try {
synchronized (this) {
if (!stopped) {
stopped = true;
log.info("Gracefully stopping.");
if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
firehose.close();
} else {
log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose);
}
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

/**
* Public for tests.
*/
@JsonIgnore
public Firehose getFirehose()
{
return firehose;
}

@JsonProperty("spec")
public FireDepartment getRealtimeIngestionSchema()
{
return spec;
}

/**
* Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than
* abruptly stopping.
* <p/>
* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this.
*/
private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)
{
return firehoseFactory instanceof EventReceiverFirehoseFactory
|| (firehoseFactory instanceof TimedShutoffFirehoseFactory
&& isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).getDelegateFactory()))
|| (firehoseFactory instanceof ClippedFirehoseFactory
&& isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate()));
}

public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,30 @@ public interface Task
{
/**
* Returns ID of this task. Must be unique across all tasks ever created.
*
* @return task ID
*/
public String getId();

/**
* Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks,
* a common convention is to set group ID equal to task ID.
*
* @return task group ID
*/
public String getGroupId();

/**
* Returns a {@link io.druid.indexing.common.task.TaskResource} for this task. Task resources define specific
* worker requirements a task may require.
*
* @return {@link io.druid.indexing.common.task.TaskResource} for this task
*/
public TaskResource getTaskResource();

/**
* Returns a descriptive label for this task type. Used for metrics emission and logging.
*
* @return task type label
*/
public String getType();
Expand All @@ -90,7 +94,7 @@ public interface Task
* Get the nodeType for if/when this task publishes on zookeeper.
*
* @return the nodeType to use when publishing the server to zookeeper. null if the task doesn't expect to
* publish to zookeeper.
* publish to zookeeper.
*/
public String getNodeType();

Expand All @@ -102,7 +106,9 @@ public interface Task
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.
*
* @param <T> query result type
*
* @return query runners for this task
*/
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
Expand All @@ -117,7 +123,7 @@ public interface Task
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
*
* <p/>
* This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)
Expand All @@ -128,6 +134,20 @@ public interface Task
*/
public boolean isReady(TaskActionClient taskActionClient) throws Exception;

/**
* Returns whether or not this task can restore its progress from its on-disk working directory. Restorable tasks
* may be started with a non-empty working directory. Tasks that exit uncleanly may still have a chance to attempt
* restores, meaning that restorable tasks should be able to deal with potentially partially written on-disk state.
*/
public boolean canRestore();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a general resource perspective, would it be reasonable for a resource management system to assign persistent resources to this task if canRestore is true? What sort of rules are there about how long those persistent resources should be kept in the absence of a task using them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By 'persistent resources' do you mean a persistent working directory, so if the machine running the task fails, it can be restored from a different machine with the original persistent working directory?

If so… that would be awesome.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be done with EBS/EFS or NFS or some other type of strategy, yes.

Also imagine a world where some cluster resource manager like Mesos is holding a persistent resource for the runner so it can restart if needed. So there needs to be a clear expectation about how long Mesos (or the framework on Mesos) is expected to reserve that persistent resource before considering it lost.


/**
* Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
* {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
* extreme prejudice.
*/
public void stopGracefully();

/**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
* holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the task
Expand Down
Loading