Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 47 additions & 9 deletions common/src/main/java/io/druid/concurrent/Execs.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
Expand All @@ -37,24 +39,51 @@
*/
public class Execs
{
public static ExecutorService singleThreaded(String nameFormat)

public static ExecutorService singleThreaded(@NotNull String nameFormat)
{
return Executors.newSingleThreadExecutor(makeThreadFactory(nameFormat));
return singleThreaded(nameFormat, null);
}

public static ExecutorService multiThreaded(int threads, String nameFormat)
public static ExecutorService singleThreaded(@NotNull String nameFormat, @Nullable Integer priority)
{
return Executors.newFixedThreadPool(threads, makeThreadFactory(nameFormat));
return Executors.newSingleThreadExecutor(makeThreadFactory(nameFormat, priority));
}

public static ScheduledExecutorService scheduledSingleThreaded(String nameFormat)
public static ExecutorService multiThreaded(int threads, @NotNull String nameFormat)
{
return Executors.newSingleThreadScheduledExecutor(makeThreadFactory(nameFormat));
return multiThreaded(threads, nameFormat, null);
}

public static ThreadFactory makeThreadFactory(String nameFormat)
public static ExecutorService multiThreaded(int threads, @NotNull String nameFormat, @Nullable Integer priority)
{
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(nameFormat).build();
return Executors.newFixedThreadPool(threads, makeThreadFactory(nameFormat, priority));
}

public static ScheduledExecutorService scheduledSingleThreaded(@NotNull String nameFormat)
{
return scheduledSingleThreaded(nameFormat, null);
}

public static ScheduledExecutorService scheduledSingleThreaded(@NotNull String nameFormat, @Nullable Integer priority)
{
return Executors.newSingleThreadScheduledExecutor(makeThreadFactory(nameFormat, priority));
}

public static ThreadFactory makeThreadFactory(@NotNull String nameFormat)
{
return makeThreadFactory(nameFormat, null);
}

public static ThreadFactory makeThreadFactory(@NotNull String nameFormat, @Nullable Integer priority)
{
final ThreadFactoryBuilder builder = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(nameFormat);
if (priority != null) {
builder.setPriority(priority);
}
return builder.build();
}

/**
Expand All @@ -64,6 +93,15 @@ public static ThreadFactory makeThreadFactory(String nameFormat)
* @return ExecutorService which blocks accepting new tasks when the capacity reached
*/
public static ExecutorService newBlockingSingleThreaded(final String nameFormat, final int capacity)
{
return newBlockingSingleThreaded(nameFormat, capacity, null);
}

public static ExecutorService newBlockingSingleThreaded(
final String nameFormat,
final int capacity,
final Integer priority
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

)
{
final BlockingQueue<Runnable> queue;
if (capacity > 0) {
Expand All @@ -72,7 +110,7 @@ public static ExecutorService newBlockingSingleThreaded(final String nameFormat,
queue = new SynchronousQueue<>();
}
return new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat),
1, 1, 0L, TimeUnit.MILLISECONDS, queue, makeThreadFactory(nameFormat, priority),
new RejectedExecutionHandler()
{
@Override
Expand Down
68 changes: 68 additions & 0 deletions common/src/main/java/io/druid/concurrent/TaskThreadPriority.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.concurrent;

public class TaskThreadPriority
{
// The task context key to grab the task priority from
public static final String CONTEXT_KEY = "backgroundThreadPriority";
// NOTE: Setting negative nice values on linux systems (threadPriority > Thread.NORM_PRIORITY) requires running
// as *ROOT*. This is, in general, not advisable.
// In order to have these priorities honored on linux systems, the JVM must be launched with the following options:
//
// -XX:+UseThreadPriorities -XX:ThreadPriorityPolicy=42
//
// +UseThreadPriorities usually only enables setting thread priorities if run as root... but
// ThreadPriorityPolicy is "supposed" to be either 0 or 1, but there is a "bug"/feature in
// the common JVMs. Whereby if UseThreadPriorities is set, and the ThreadPriorityPolicy!=1
// the check for "root" is skipped. This works fine as long as you are LOWERING the
// threadPriority of tasks (which we are). If you modify the code to allow higher priorities
// things will crash and burn at runtime. It is advisable to set it to 42 so that relevant searches can be found on
// the flag
//
// Not setting these JVM options disables thread priorities on linux systems
//
// See : http://www.akshaal.info/2008/04/javas-thread-priorities-in-linux.html for an explanation
// See : http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/b0c7e7f1bbbe/src/os/linux/vm/os_linux.cpp#l3933 for
// the bug in action
// See: https://docs.oracle.com/cd/E15289_01/doc.40/e15062/optionxx.htm#BABGBFHF for the options documentation

/**
* Return the thread-factory friendly priorities from the task priority
*
* @param taskPriority The priority for the task. >0 means high. 0 means inherit from current thread, <0 means low.
*
* @return The thread priority to use in a thread factory, or null if no priority is to be set
*/
public static Integer getThreadPriorityFromTaskPriority(final int taskPriority)
{
if (taskPriority == 0) {
return null;
}
int finalPriority = taskPriority + Thread.NORM_PRIORITY;
if (taskPriority > Thread.MAX_PRIORITY) {
return Thread.MAX_PRIORITY;
}
if (finalPriority < Thread.MIN_PRIORITY) {
return Thread.MIN_PRIORITY;
}
return finalPriority;
}
}
4 changes: 4 additions & 0 deletions docs/content/ingestion/realtime-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec'|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|
|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default = 0; inherit and do not override)|

Before enabling thread priority settings, users are highly encouraged to read the [original pull request](https://github.com/druid-io/druid/pull/984) and other documentation about proper use of `-XX:+UseThreadPriorities`.

#### Rejection Policy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public String toString()
* Start helper methods
*
* @param objects objects to join
*
* @return string of joined objects
*/
public static String joinId(Object... objects)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ static RealtimeTuningConfig convertTuningConfig(
null,
shardSpec,
indexSpec,
buildV9Directly
buildV9Directly,
0,
0
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,14 @@ public TaskStatus call()

command.add(String.format("-Ddruid.host=%s", childHost));
command.add(String.format("-Ddruid.port=%d", childPort));
/**
* These are not enabled per default to allow the user to either set or not set them
* Users are highly suggested to be set in druid.indexer.runner.javaOpts
* See io.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
* for more information
command.add("-XX:+UseThreadPriorities");
command.add("-XX:ThreadPriorityPolicy=42");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a link to the documentation explaining this parameter

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is some weird undocumented behavior in the vm that we rely on, I would prefer we simply put a note in the documentation about this and let users set the parameter themselves in the forking task runner vm parameter.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.oracle.com/cd/E15289_01/doc.40/e15062/optionxx.htm#BABGBFHF

Without setting the command line flags the behavior is disabled at the JVM level

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • for linux

*/

if (config.isSeparateIngestionEndpoint()) {
command.add(String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.concurrent.Execs;
import io.druid.concurrent.TaskThreadPriority;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.TaskToolboxFactory;
Expand All @@ -52,8 +53,11 @@

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;

Expand All @@ -66,7 +70,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker

private final TaskToolboxFactory toolboxFactory;
private final TaskConfig taskConfig;
private final ListeningExecutorService exec;
private final ConcurrentMap<Integer, ListeningExecutorService> exec = new ConcurrentHashMap<>();
private final Set<ThreadPoolTaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<>();
private final ServiceEmitter emitter;

Expand All @@ -79,7 +83,6 @@ public ThreadPoolTaskRunner(
{
this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
this.taskConfig = taskConfig;
this.exec = MoreExecutors.listeningDecorator(Execs.singleThreaded("task-runner-%d"));
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
}

Expand All @@ -89,10 +92,27 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
return ImmutableList.of();
}

private static ListeningExecutorService buildExecutorService(int priority)
{
return MoreExecutors.listeningDecorator(
Execs.singleThreaded(
"task-runner-%d-priority-" + priority,
TaskThreadPriority.getThreadPriorityFromTaskPriority(priority)
)
);
}

@LifecycleStop
public void stop()
{
exec.shutdown();
for (Map.Entry<Integer, ListeningExecutorService> entry : exec.entrySet()) {
try {
entry.getValue().shutdown();
}
catch (SecurityException ex) {
log.wtf(ex, "I can't control my own threads!");
}
}

for (ThreadPoolTaskRunnerWorkItem item : runningItems) {
final Task task = item.getTask();
Expand Down Expand Up @@ -145,14 +165,44 @@ public void stop()
}

// Ok, now interrupt everything.
exec.shutdownNow();
for (Map.Entry<Integer, ListeningExecutorService> entry : exec.entrySet()) {
try {
entry.getValue().shutdownNow();
}
catch (SecurityException ex) {
log.wtf(ex, "I can't control my own threads!");
}
}
}

@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
final TaskToolbox toolbox = toolboxFactory.build(task);
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ThreadPoolTaskRunnerCallable(task, toolbox));
final Object taskPriorityObj = task.getContextValue(TaskThreadPriority.CONTEXT_KEY);
int taskPriority = 0;
if(taskPriorityObj != null){
if(taskPriorityObj instanceof Number) {
taskPriority = ((Number) taskPriorityObj).intValue();
} else if(taskPriorityObj instanceof String) {
try {
taskPriority = Integer.parseInt(taskPriorityObj.toString());
}
catch (NumberFormatException e) {
log.error(e, "Error parsing task priority [%s] for task [%s]", taskPriorityObj, task.getId());
}
}
}
// Ensure an executor for that priority exists
if (!exec.containsKey(taskPriority)) {
final ListeningExecutorService executorService = buildExecutorService(taskPriority);
if (exec.putIfAbsent(taskPriority, executorService) != null) {
// favor prior service
executorService.shutdownNow();
}
}
final ListenableFuture<TaskStatus> statusFuture = exec.get(taskPriority)
.submit(new ThreadPoolTaskRunnerCallable(task, toolbox));
final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem = new ThreadPoolTaskRunnerWorkItem(task, statusFuture);
runningItems.add(taskRunnerWorkItem);
Futures.addCallback(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId)
null,
null,
null,
buildV9Directly
buildV9Directly,
0, 0
);
return new RealtimeIndexTask(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ public Plumber findPlumber(
1,
new NoneShardSpec(),
indexSpec,
null
null,
0,
0
)
),
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,8 @@ private RealtimeIndexTask giveMeARealtimeIndexTask()
null,
null,
null,
null
null,
0, 0
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
return new RealtimeIndexTask(
Expand Down
Loading