Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.indexing.common;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

public class TaskLocation
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this just use Worker?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really semantically a worker- one worker is going to have many tasks at many taskLocations (all at different ports from the parent worker).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then can they be DruidNodes?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess chatPort makes that a no?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to simply propagate DruidNode and have chatPort discoverable from the node data exposed at DruidNode?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about just using HostAndPort? (I'm trying to minimize the number of items that are added to the code)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, in #2242 I am making DruidServerMetadata the source of truth for Druid server's metadata. I think it's reasonable to make DruidServerMetadata to contain a minimum set of metadata(e.g., host, port, name, etc), and then let Worker, TaskLocation, DruidServer extend it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hostAndPort doesn't have jackson annotations. We could register a jackson module for it, I suppose (or maybe it's already part of the GuavaModule?). I am also ok with replacing this with the stuff from #2242 when that pr is ready.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like HostAndPort serde is included in the GuavaModule, although it uses the ToStringSerializer, which will be kind of a pain for people to deserialize that aren't linking guava in their app. so I am leaning towards keeping TaskLocation as a thing and potentially changing it after #2242.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok

{
private static final TaskLocation UNKNOWN = new TaskLocation(null, -1);

private final String host;
private final int port;

public static TaskLocation create(String host, int port)
{
return new TaskLocation(host, port);
}

public static TaskLocation unknown()
{
return TaskLocation.UNKNOWN;
}

@JsonCreator
public TaskLocation(
@JsonProperty("host") String host,
@JsonProperty("port") int port
)
{
this.host = host;
this.port = port;
}

@JsonProperty
public String getHost()
{
return host;
}

@JsonProperty
public int getPort()
{
return port;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskLocation that = (TaskLocation) o;
return port == that.port &&
Objects.equals(host, that.host);
}

@Override
public int hashCode()
{
return Objects.hash(host, port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.guice.annotations.Self;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
Expand Down Expand Up @@ -81,6 +82,8 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -99,6 +102,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
private final ListeningExecutorService exec;
private final ObjectMapper jsonMapper;
private final PortFinder portFinder;
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();

// Writes must be synchronized. This is only a ConcurrentMap so "informational" reads can occur without waiting.
private final Map<String, ForkingTaskRunnerWorkItem> tasks = Maps.newConcurrentMap();
Expand Down Expand Up @@ -171,6 +175,19 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
return retVal;
}

public void registerListener(TaskRunnerListener listener, Executor executor)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we ever need to unregister a listener also ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't have a need for that, IMO no point adding a method until someone needs it

{
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);

synchronized (tasks) {
for (ForkingTaskRunnerWorkItem item : tasks.values()) {
TaskRunnerUtils.notifyLocationChanged(ImmutableList.of(listenerPair), item.getTaskId(), item.getLocation());
}

listeners.add(listenerPair);
}
}

@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
Expand All @@ -191,6 +208,7 @@ public TaskStatus call()
final File attemptDir = new File(taskDir, attemptUUID);

final ProcessHolder processHolder;
final String childHost = node.getHost();
final int childPort;
final int childChatHandlerPort;

Expand All @@ -203,6 +221,8 @@ public TaskStatus call()
childChatHandlerPort = -1;
}

final TaskLocation taskLocation = TaskLocation.create(childHost, childPort);

try {
final Closer closer = Closer.create();
try {
Expand Down Expand Up @@ -235,7 +255,6 @@ public TaskStatus call()
}

final List<String> command = Lists.newArrayList();
final String childHost = node.getHost();
final String taskClasspath;
if (task.getClasspathPrefix() != null && !task.getClasspathPrefix().isEmpty()) {
taskClasspath = Joiner.on(File.pathSeparator).join(
Expand All @@ -250,7 +269,10 @@ public TaskStatus call()
command.add("-cp");
command.add(taskClasspath);

Iterables.addAll(command, new QuotableWhiteSpaceSplitter(config.getJavaOpts(), jsonMapper));
Iterables.addAll(
command,
new QuotableWhiteSpaceSplitter(config.getJavaOpts(), jsonMapper)
);

// Override task specific javaOpts
Object taskJavaOpts = task.getContextValue(
Expand Down Expand Up @@ -333,8 +355,8 @@ public TaskStatus call()
* Users are highly suggested to be set in druid.indexer.runner.javaOpts
* See io.druid.concurrent.TaskThreadPriority#getThreadPriorityFromTaskPriority(int)
* for more information
command.add("-XX:+UseThreadPriorities");
command.add("-XX:ThreadPriorityPolicy=42");
command.add("-XX:+UseThreadPriorities");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our IDEs needs to decide how this should go :) I think every RTR patch switches which way it wants this.

command.add("-XX:ThreadPriorityPolicy=42");
*/

if (config.isSeparateIngestionEndpoint()) {
Expand Down Expand Up @@ -370,13 +392,16 @@ public TaskStatus call()
taskWorkItem.processHolder = new ProcessHolder(
new ProcessBuilder(ImmutableList.copyOf(command)).redirectErrorStream(true).start(),
logFile,
childPort
taskLocation.getHost(),
taskLocation.getPort()
);

processHolder = taskWorkItem.processHolder;
processHolder.registerWithCloser(closer);
}

TaskRunnerUtils.notifyLocationChanged(listeners, task.getId(), taskLocation);

log.info("Logging task %s output to: %s", task.getId(), logFile);
boolean runFailed = true;

Expand Down Expand Up @@ -673,18 +698,30 @@ public Task getTask()
{
return task;
}

@Override
public TaskLocation getLocation()
{
if (processHolder == null) {
return TaskLocation.unknown();
} else {
return TaskLocation.create(processHolder.host, processHolder.port);
}
}
}

private static class ProcessHolder
{
private final Process process;
private final File logFile;
private final String host;
private final int port;

private ProcessHolder(Process process, File logFile, int port)
private ProcessHolder(Process process, File logFile, String host, int port)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

going along with prior comment, can this take a HostAndPort argument instead of a host and port separate?

{
this.process = process;
this.logFile = logFile;
this.host = host;
this.port = port;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.curator.CuratorUtils;
import io.druid.curator.cache.PathChildrenCacheFactory;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.autoscaling.ResourceManagementStrategy;
Expand Down Expand Up @@ -91,7 +92,9 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -145,6 +148,8 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
// Workers that have been marked as lazy. these workers are not running any tasks and can be terminated safely by the scaling policy.
private final ConcurrentMap<String, ZkWorker> lazyWorkers = new ConcurrentHashMap<>();

// task runner listeners
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();

private final Object statusLock = new Object();

Expand Down Expand Up @@ -327,6 +332,24 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
return ImmutableList.of();
}

@Override
public void registerListener(TaskRunnerListener listener, Executor executor)
{
final Pair<TaskRunnerListener, Executor> listenerPair = Pair.of(listener, executor);

synchronized (statusLock) {
for (Map.Entry<String, RemoteTaskRunnerWorkItem> entry : runningTasks.entrySet()) {
TaskRunnerUtils.notifyLocationChanged(
ImmutableList.of(listenerPair),
entry.getKey(),
entry.getValue().getLocation()
);
}

listeners.add(listenerPair);
}
}

@Override
public Collection<Worker> getWorkers()
{
Expand Down Expand Up @@ -517,7 +540,7 @@ private URL makeWorkerURL(Worker worker, String path)
private RemoteTaskRunnerWorkItem addPendingTask(final Task task)
{
log.info("Added pending task %s", task.getId());
final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null);
final RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(task.getId(), null, null);
pendingTaskPayloads.put(task.getId(), task);
pendingTasks.put(task.getId(), taskRunnerWorkItem);
runPendingTasks();
Expand Down Expand Up @@ -706,7 +729,7 @@ private boolean announceTask(
return false;
}

RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker());
RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
runningTasks.put(task.getId(), newWorkItem);
log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());

Expand Down Expand Up @@ -783,15 +806,16 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th
case CHILD_ADDED:
case CHILD_UPDATED:
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskStatus taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class
final TaskAnnouncement announcement = jsonMapper.readValue(
event.getData().getData(), TaskAnnouncement.class
);

log.info(
"Worker[%s] wrote %s status for task: %s",
"Worker[%s] wrote %s status for task [%s] on [%s]",
zkWorker.getWorker().getHost(),
taskStatus.getStatusCode(),
taskId
announcement.getTaskStatus().getStatusCode(),
taskId,
announcement.getTaskLocation()
);

// Synchronizing state with ZK
Expand All @@ -803,7 +827,8 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th
} else {
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId,
zkWorker.getWorker()
zkWorker.getWorker(),
TaskLocation.unknown()
);
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
Expand All @@ -821,8 +846,13 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th
}
}

if (taskStatus.isComplete()) {
taskComplete(taskRunnerWorkItem, zkWorker, taskStatus);
if (!announcement.getTaskLocation().equals(taskRunnerWorkItem.getLocation())) {
taskRunnerWorkItem.setLocation(announcement.getTaskLocation());
TaskRunnerUtils.notifyLocationChanged(listeners, taskId, announcement.getTaskLocation());
}

if (announcement.getTaskStatus().isComplete()) {
taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
runPendingTasks();
}
break;
Expand Down
Loading