Add CliIndexer process type and initial task runner implementation#8107
Add CliIndexer process type and initial task runner implementation#8107himanshug merged 29 commits intoapache:masterfrom
Conversation
clintropolis
left a comment
There was a problem hiding this comment.
Had a first pass, also pulled the branch and fired one of these things up, and (after setting druid.service) it worked for me! 🤘
|
|
||
| synchronized (lock) { | ||
| try { | ||
| // When stopping, the task status should not be communicated to the overlord, so the listener and exec |
There was a problem hiding this comment.
thanks for adding comment 👍
| @Override | ||
| public void removeAppenderatorForTask(String taskId) | ||
| { | ||
|
|
There was a problem hiding this comment.
can you add a // nothing to remove or something so this doesn't look so sad?
| binder.bind(DruidNode.class).annotatedWith(RemoteChatHandler.class).to(Key.get(DruidNode.class, Self.class)); | ||
| if (useSeparatePort) { | ||
| // bind a modified DruidNode that will be used by the Jetty server installed below | ||
| binder.bind(DruidNode.class) |
There was a problem hiding this comment.
This ends up being unfortunate in that it requires that druid.service be set since the 'default' value is not available on the binding at the point that this runs, however I'm not sure how to better fix it 😢
There was a problem hiding this comment.
Would it be possible to internally use a QoS filter with jetty to leave some room for chat handler traffic instead relying on a separate server with a separate pool? I don't think this would require bubbling up to the user configs, just something we compute based on the top level configs (though it might require some minimum).
I think it would simplify things both conceptually and for the implementation, and would make this no longer an issue I think.
There was a problem hiding this comment.
I redid this to use QoS filters, the configured number of connections are doubled (one set for chat handler, the other for non-chat handler requests), with 2 reserved connections for lookups
| */ | ||
| @Command( | ||
| name = "indexer", | ||
| description = "Runs an Indexer. Description TBD." |
There was a problem hiding this comment.
Don't forget to add a description
| .addBinding("local") | ||
| .to(LocalTaskActionClientFactory.class) | ||
| .in(LazySingleton.class); | ||
| // all of these bindings are so that we can run the peon in local mode |
There was a problem hiding this comment.
This comment isn't quite accurate, this should probably refer to indexer. Additionally, there are lot of bindings getting wired up shared between this and peon, with this entire block looking like a straight copy and paste for one, is it possible to restructure stuff so this can be shared between them?
There was a problem hiding this comment.
Moved some of the shared binding blocks into methods on CliPeon/CliMiddleManager for now
| { | ||
| private Appenderator realtimeAppenderator; | ||
|
|
||
| public TestAppenderatorsManager() |
There was a problem hiding this comment.
Does this constructor need to exist?
There was a problem hiding this comment.
Removed unnecessary constructor
|
|
||
| toolbox.getDataSegmentServerAnnouncer().unannounce(); | ||
| toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); | ||
| if (appenderatorsManager.shouldTaskMakeNodeAnnouncements()) { |
There was a problem hiding this comment.
Should we just make the AppenderatorsManager handle task announcements entirely for tasks, and instead here just call methods on it signaling to do so or not? I guess sort of awkward with the way things are currently since the announcers are fetched from the toolbox
There was a problem hiding this comment.
I had done that initially, but decided to just use a boolean since the CliIndexer implementation would've done nothing for those methods
| */ | ||
| public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager | ||
| { | ||
| private static final Logger log = new Logger(UnifiedIndexerAppenderatorsManager.class); |
There was a problem hiding this comment.
this log isn't used, also I think we are supposed to make static variables all uppercase.
There was a problem hiding this comment.
Removed unused logger
| private static final Logger log = new Logger(UnifiedIndexerAppenderatorsManager.class); | ||
|
|
||
| private final ConcurrentHashMap<String, SinkQuerySegmentWalker> datasourceSegmentWalkers = new ConcurrentHashMap<>(); | ||
| private final ConcurrentHashMap<String, Appenderator> taskAppenderatorMap = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Stuff is only put and removed from here, but never read?
There was a problem hiding this comment.
It's something that would've been expanded in follow on PRs, but I removed it for now
| @ManageLifecycle | ||
| public class UnifiedIndexerLifecycleHandler | ||
| { | ||
| private static final EmittingLogger log = new EmittingLogger(UnifiedIndexerLifecycleHandler.class); |
There was a problem hiding this comment.
I think static variables are supposed to be all caps
There was a problem hiding this comment.
Changed to all caps
There was a problem hiding this comment.
Can't be null here since taskWorkItem.shutdown is already checked above. I guess this should be checked first before checking taskWorkItem.shutdown?
There was a problem hiding this comment.
Nice catch, I fixed this in ForkingTaskRunner as well
| import java.util.concurrent.Executor; | ||
|
|
||
| /** | ||
| * Base class for {@link ForkingTaskRunner} and {@link ThreadingTaskRunner} which support task restoration. |
| } | ||
|
|
||
| @Override | ||
| public Collection<TaskRunnerWorkItem> getRunningTasks() |
There was a problem hiding this comment.
Any reason getRunningTasks, getPendingTasks, and getRunnerTaskState are not abstract instead of default explodey? BaseRestorableTaskRunner doesn't have any abstract other than calling the class that currently, seems like
@Override
public abstract Collection<TaskRunnerWorkItem> getRunningTasks();
@Override
public abstract Collection<TaskRunnerWorkItem> getPendingTasks();
@Nullable
@Override
public abstract RunnerTaskState getRunnerTaskState(String taskId);would be appropriate.
| * Only one shutdown per-task can be running at a given time, and there is one control thread per task, | ||
| * thus the pool has 2 * worker slots. | ||
| * | ||
| * Note that separate task logs are not supported, all task log entries will be written to the Indexer process log |
There was a problem hiding this comment.
It does seem like it would be very tricky to retain having individual task logs that get pushed to deep storage, involving some sort of log4j wizardry unless I'm missing something clever.
| import java.util.concurrent.ExecutorService; | ||
|
|
||
| /** | ||
| * This interface defines entities that create and manage potentially multiple Appenderators. |
There was a problem hiding this comment.
nit: could you add javadoc links to things like Appenderator and other types? I find them handy for easily navigating to things in intellij when I'm reading about how stuff works
|
|
||
| import java.util.concurrent.ExecutorService; | ||
|
|
||
| public class PeonAppenderatorsManager implements AppenderatorsManager |
There was a problem hiding this comment.
Could you add javadoc for this guy?
There was a problem hiding this comment.
Added javadocs (also added javadoc links in AppenderatorsManager)
|
|
||
| public static final int DEFAULT_GZIP_INFLATE_BUFFER_SIZE = 4096; | ||
|
|
||
| public ServerConfig( |
There was a problem hiding this comment.
Should this be done as @JsonCreator to get rid of the need for the empty constructor or is there something in this that doesn't come from json?
There was a problem hiding this comment.
The empty constructor is still used by some tests, I added a comment about how the new non-empty constructor is used.
There was a problem hiding this comment.
I'm surprised that serde works fine with two constructors and without anyone marked @JsonCreator ... how does ObjectMapper decide which constructor to use for deserialization ?
There was a problem hiding this comment.
I'm not sure on the exact details of what ObjectMapper does to deserialize with those constructors, but I added a new ServerConfigSerdeTest that checks this behavior
| binder, | ||
| ListenerResource.BASE_PATH + "/" + LookupCoordinatorManager.LOOKUP_LISTEN_ANNOUNCE_KEY, | ||
| 2 // 1 for "normal" operation and 1 for "emergency" or other | ||
| LOOKUP_LISTENER_QOS_MAX_REQUESTS // 1 for "normal" operation and 1 for "emergency" or other |
There was a problem hiding this comment.
Heh neat, is this documented anywhere I wonder?
There was a problem hiding this comment.
It wasn't, I added a brief mention of it to the lookup docs
| The query serving processes have an internal API for managing lookups on the process and those are used by the Coordinator. | ||
| The Coordinator periodically checks if any of the processes need to load/drop lookups and updates them appropriately. | ||
|
|
||
| Please note that only 2 simultaneous lookup configuration propagation requests can be concurrently handled by a single query serving process, to prevent lookup handling from consuming too many server HTTP connections. |
There was a problem hiding this comment.
optional: I think this is a very dense sentence, it might be friendlier to break up at the comma, maybe:
Please note that only 2 simultaneous lookup configuration propagation requests can be concurrently handled by a single query serving process. This is done to prevent lookup handling from consuming too many server HTTP connections.
Thanks for adding it to the docs though 👍
| } | ||
| } | ||
|
|
||
| // Save running tasks to a file, so they can potentially be restored on next startup. Suppresses exceptions that |
|
I did some more testing with this on my laptop with a setup of 1 of each broker, router, coordinator, overlord, and 2 indexer and historicals Doing small scale some kafka indexing testing to make sure realtime queries and handoff were functioning Overall things are working nicely. I did run into an issue when trying to stop an indexer node ( The supervisor on the overlord is then forever stuck in a loop, performing an action it can never complete because the indexer has stopped listening. The indexer eventually gives up after 5 minute timeout and ungracefully stops, but the supervisor/overlord appears to remain stuck until either the indexer comes back on the same host/port or the overlord is restarted. This also jams up what the web ui displays as the task status, where the task of the stuck indexer remains in the 'running' state until the same condition of the indexer returning or the overlord is restarting is met. This issue aside, I'm still +1 on this if you'd rather fix this in a follow-up PR, since this is currently an undocumented feature anyway. |
|
I observed the Jetty server shutdown before task shutdown in a Peon too, that may be worth looking into as a separate issue. The hanging you observed was due to SeekableStreamIndexTask not doing any shutdown actions if I've updated the patch to have the stopGracefully() methods on the tasks interrupt their run method |
| } | ||
| ); | ||
|
|
||
| DruidNodeDiscovery druidNodeDiscoveryIndexer = druidNodeDiscoveryProvider.getForNodeType(NodeType.INDEXER); |
There was a problem hiding this comment.
Instead of repeated code you can change line 441 to
DruidNodeDiscovery druidNodeDiscovery = druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY);
There was a problem hiding this comment.
I changed this to use the suggested code.
It looks like WorkerNodeService.DISCOVERY_SERVICE_KEY was incorrectly mapped to NodeType.PEON in DruidNodeDiscoveryProvider previously, I also changed that to the MM and indexer types.
| // Synchronizes start/stop of this object. | ||
| private final Object startStopLock = new Object(); | ||
|
|
||
| private volatile boolean started = false; |
There was a problem hiding this comment.
there is a LifecycleLock utility class to manage lifecycle instead of startStopLock and started flag, that can be used here.
There was a problem hiding this comment.
Adjusted this to use LifecycleLock
| * It is used by UnifiedIndexerAppenderatorsManager which allows queries on data associated with multiple | ||
| * Appenderators. | ||
| */ | ||
| AppenderatorImpl( |
There was a problem hiding this comment.
can other constructor use this constructor instead of repeating everything ?
There was a problem hiding this comment.
Adjusted the constructors so one calls the other, the sink timeline handling in the constructor I added was also changed to accommodate that
|
|
||
| /** | ||
| * Base class for {@link ForkingTaskRunner} and {@link ThreadingTaskRunner} which support task rest | ||
| * oration. |
There was a problem hiding this comment.
nit: can we keep "restoration" as single word
| public abstract Collection<TaskRunnerWorkItem> getPendingTasks(); | ||
|
|
||
| /** | ||
| * The concrete implementation should override this, as the implemention depends on the specifics of the |
There was a problem hiding this comment.
nit: comments on these 3 methods have no value as those methods are already declared abstract. if we do, the comment should be about what is expected from implementors or no comment.
There was a problem hiding this comment.
Removed the comments
| } | ||
|
|
||
| @Override | ||
| public void write(String taskId, Map<String, TaskReport> reports) |
There was a problem hiding this comment.
it is weird that instance of this class will retain and manage a map of taskId -> file (because it inherits TaskReportFileWriter) but those would be ignored all time.
TaskReportWriter probably can be trimmed down a lot more, declared abstract/interface and have separate MultiFileTaskReportFileWriter wherever TaskReportWriter is currently used.
There was a problem hiding this comment.
I redid these classes, there's now a TaskReportFileWriter interface with three implementations:
SingleFileTaskReportFileWriterMultipleFileTaskReportFileWriterNoopTestTaskReportFileWriter
| * Only one shutdown per-task can be running at a given time, and there is one control thread per task, | ||
| * thus the pool has 2 * worker slots. | ||
| * | ||
| * Note that separate task logs are not supported, all task log entries will be written to the Indexer process log |
There was a problem hiding this comment.
is this a choice we are making intentionally to keep log in same place or no separation is done because we haven't figured it out yet ?
if we do end up keeping all logs in same place then it would be great if taskId is included in the log line printed by specific log, so as to be able to get some sense of a task by just grepping for its taskId.
| task.getId(), | ||
| TaskStatus.running(task.getId()) | ||
| ); | ||
| taskStatus = taskStatusFuture.get(); |
There was a problem hiding this comment.
not sure why we are running task in a separate thread while this thread is waiting for the task to finish anyway ?
if this is done for making calls to notifyXXX() then they could be done right before calling task.run(toolbox) if task was run in this thread itself ?
There was a problem hiding this comment.
It was partly for making the notify calls after the run started, and partly because I wanted to have a similar structure to ForkingTaskRunner where the control context and the task code were separated by thread/process, but it's not really necessary to do that.
I changed the task setup and run() to execute using a thread in taskExecutor, the control thread pool is only used to run externally triggered task shutdown now (and there is only 1 control thread per task slot now)
| ); | ||
|
|
||
| try { | ||
| final Closer closer = Closer.create(); |
There was a problem hiding this comment.
couldn't understand why we have this Closer object, nothing appears to be accumulated in it for closing.
There was a problem hiding this comment.
Thanks, removed this, it was a relic from ForkingTaskRunner
|
|
||
| try { | ||
| final Closer closer = Closer.create(); | ||
| try { |
There was a problem hiding this comment.
do we really need two try blocks ... line 164 and 166 , could they be folded in to one ?
There was a problem hiding this comment.
Collapsed into a single try
|
|
||
| if (status == null) { | ||
| if (taskInfo.thread != null) { | ||
| taskInfo.thread.interrupt(); |
There was a problem hiding this comment.
is it possible that the interrupt status of the thread is not cleared by this task if it wasn't blocked waiting on something. in that case same thread with interrupt status set would pick a new task ?
There was a problem hiding this comment.
also I see that you have also updated some of the tasks to capture their own threads on run and interrupt them inside stopGracefully(..) .... one of these appear redundant, no ?
There was a problem hiding this comment.
For the first point, I added a call to Thread.interrupted() at the very end of the outermost finally in the task callable.
For the second point, the interrupt here is meant as a "last resort" if the graceful shutdown didn't complete for some reason within the configured timeout, the interrupt within the tasks should normally end the shutdown before this is called
There was a problem hiding this comment.
I might be wrong but it appears to me that whole business with the thread is unnecessary and you can achieve same by calling ThreadingTaskRunnerWorkItem.shutdownFuture.cance(true) and you wouldn't have to deal manually with interrupts. That would also remove task from executor if it hasn't started running already.
sidenote: I see that you are trying to simulate force stop of a task which is impossible to do in this context. If task is not cooperative, nothing is gonna work unfortunately. in the long run, this taskRunner requires adding another method to Task interface to tell task to force stop itself.
There was a problem hiding this comment.
also, since we are interrupting the thread from outside, doing same in the task implementations is not achieving anything extra and should probably be removed, that just introduces confusion with no benefit.... or maybe I am still missing something :)
| return null; | ||
| } else { | ||
| if (workItem.getThread() == null) { | ||
| return RunnerTaskState.PENDING; |
There was a problem hiding this comment.
can't it mean that task is finished as we set thread to null when task finishes ?
There was a problem hiding this comment.
I forget if I had the thread set to null after task finishes in earlier revisions, but currently the thread doesn't get nulled out after task completion
| taskStatus = task.run(toolbox); | ||
| } | ||
| catch (Throwable t) { | ||
| LOGGER.info(t, "Exception caught while running the task."); |
| return taskStatus; | ||
| } | ||
| catch (Throwable t) { | ||
| LOGGER.info(t, "Exception caught during execution"); |
|
|
||
| if (status == null) { | ||
| if (taskInfo.thread != null) { | ||
| taskInfo.thread.interrupt(); |
There was a problem hiding this comment.
I might be wrong but it appears to me that whole business with the thread is unnecessary and you can achieve same by calling ThreadingTaskRunnerWorkItem.shutdownFuture.cance(true) and you wouldn't have to deal manually with interrupts. That would also remove task from executor if it hasn't started running already.
sidenote: I see that you are trying to simulate force stop of a task which is impossible to do in this context. If task is not cooperative, nothing is gonna work unfortunately. in the long run, this taskRunner requires adding another method to Task interface to tell task to force stop itself.
|
I changed the interrupt to use future.cancel(true), and added a comment about how it's not possible to truly force a hard shutdown there (my intent in putting it there was for hypothetical cases where a task's stopGracefully takes too long or has a bug and doesn't result in a shutdown, but the task is still interruptible) |
…move thread from workitem
| // Note that we can't truly force a hard termination of the task thread externally. | ||
| // In the future we may want to add a forceful shutdown method to the Task interface. | ||
| if (taskInfo.shutdownFuture != null) { | ||
| taskInfo.shutdownFuture.cancel(true); |
There was a problem hiding this comment.
we need to cancel status future and not shutdownFuture.
also Future.get(..) throws TimeoutException instead of null, so this will not be called if task didn't finish within timeout
also, we can remove keeping the thread as "state"
anyways, I thought it would be easier to write code :) , can you merge jon-wei#4 which will add a commit making above changes ?
There was a problem hiding this comment.
Ah, my bad, cancelled the wrong future.
Thanks for the patch!
ThreadingTaskRunner: cancel task run future not shutdownFuture and remove thread from workitem
|
@clintropolis @jihoonson @himanshug Thanks a lot for the review! |


First part of implementation for #7900
Description
This PR adds the initial foundation for the work in #7900:
AppenderatorsManager, which creates and manages Appenderators for tasks, has been added. The AppenderatorsManager for the new Indexer process uses a shared query execution pool and per-datasource segment walkers/imelines for the Appenderators it creates.druid.server.http.numThreads. The distinct connection pooling is handled through Jetty QoS filters.No additional memory management for the tasks is present in this PR, they will use whatever has been configured in their task specs with no additional external control.
The new process is not currently exposed in the docs, as this is an intermediate PR and the process is not ready for use yet.
This PR has: