Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,18 @@ public Optional<InputStream> streamTaskLog(String taskid, long offset)
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
return tasks.computeIfAbsent(
task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))
).getResult();
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))))
.getResult();
}
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
return tasks.computeIfAbsent(
task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))
).getResult();
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
}
}

private TaskStatus runTask(Task task)
Expand All @@ -159,21 +161,23 @@ private TaskStatus joinTask(Task task)
@VisibleForTesting
protected TaskStatus doTask(Task task, boolean run)
{
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);
try {
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);

KubernetesWorkItem workItem = tasks.get(task.getId());
synchronized (tasks) {
KubernetesWorkItem workItem = tasks.get(task.getId());

if (workItem == null) {
throw new ISE("Task [%s] disappeared", task.getId());
}
if (workItem == null) {
throw new ISE("Task [%s] disappeared", task.getId());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maybe just return a failed TaskStatus here instead of throwing an exception? The exception thrown by this method may or may not be handled by the calling code, but no point depending on that if we already know that the reason for the task failure.

We should do the same thing in the catch block too.

But this doesn't need to be done as a part of this PR, just wanted to call it out.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at what other TaskRunners do so we can have consistent behavior (throw error or return task failure).
In ThreadingTaskRunner, seems the code is similar to what we have here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, we can revisit this later.

}

if (workItem.isShutdownRequested()) {
throw new ISE("Task [%s] has been shut down", task.getId());
}
if (workItem.isShutdownRequested()) {
throw new ISE("Task [%s] has been shut down", task.getId());
}

workItem.setKubernetesPeonLifecycle(peonLifecycle);
workItem.setKubernetesPeonLifecycle(peonLifecycle);
}

try {
TaskStatus taskStatus;
if (run) {
taskStatus = peonLifecycle.run(
Expand All @@ -191,14 +195,14 @@ protected TaskStatus doTask(Task task, boolean run)

return taskStatus;
}

catch (Exception e) {
log.error(e, "Task [%s] execution caught an exception", task.getId());
throw new RuntimeException(e);
}

finally {
tasks.remove(task.getId());
synchronized (tasks) {
tasks.remove(task.getId());
}
}
}

Expand Down Expand Up @@ -269,17 +273,17 @@ public Optional<InputStream> streamTaskReports(String taskid) throws IOException
@Override
public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
{
List<Pair<Task, ListenableFuture<TaskStatus>>> tasks = new ArrayList<>();
List<Pair<Task, ListenableFuture<TaskStatus>>> restoredTasks = new ArrayList<>();
for (Job job : client.getPeonJobs()) {
try {
Task task = adapter.toTask(job);
tasks.add(Pair.of(task, joinAsync(task)));
restoredTasks.add(Pair.of(task, joinAsync(task)));
}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName());
}
}
return tasks;
return restoredTasks;
}

@Override
Expand Down Expand Up @@ -319,7 +323,6 @@ public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
return Lists.newArrayList(tasks.values());
}


@Override
public Optional<ScalingStats> getScalingStats()
{
Expand Down Expand Up @@ -386,18 +389,18 @@ public void registerListener(TaskRunnerListener listener, Executor executor)
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return tasks.values()
.stream()
.filter(KubernetesWorkItem::isRunning)
.collect(Collectors.toList());
.stream()
.filter(KubernetesWorkItem::isRunning)
.collect(Collectors.toList());
}

@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
return tasks.values()
.stream()
.filter(KubernetesWorkItem::isPending)
.collect(Collectors.toList());
.stream()
.filter(KubernetesWorkItem::isPending)
.collect(Collectors.toList());
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
Expand Down Expand Up @@ -237,17 +236,17 @@ public void test_join_whenExceptionThrown_throwsRuntimeException()
}

@Test
public void test_doTask_withoutWorkItem_throwsISE()
public void test_doTask_withoutWorkItem_throwsRuntimeException()
{
Assert.assertThrows(
"Task [id] disappeared",
ISE.class,
RuntimeException.class,
() -> runner.doTask(task, true)
);
}

@Test
public void test_doTask_whenShutdownRequested_throwsISE()
public void test_doTask_whenShutdownRequested_throwsRuntimeException()
{
KubernetesWorkItem workItem = new KubernetesWorkItem(task, null);
workItem.shutdown();
Expand All @@ -256,7 +255,7 @@ public void test_doTask_whenShutdownRequested_throwsISE()

Assert.assertThrows(
"Task [id] has been shut down",
ISE.class,
RuntimeException.class,
() -> runner.doTask(task, true)
);
}
Expand Down