Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,16 @@ public Optional<InputStream> streamTaskLog(String taskid, long offset)
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
synchronized (tasks) {
tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task))));
return tasks.get(task.getId()).getResult();
}
return tasks.computeIfAbsent(
task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> runTask(task)))
).getResult();
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
{
synchronized (tasks) {
tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))));
return tasks.get(task.getId()).getResult();
}
return tasks.computeIfAbsent(
task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))
).getResult();
}

private TaskStatus runTask(Task task)
Expand All @@ -163,20 +161,18 @@ protected TaskStatus doTask(Task task, boolean run)
{
KubernetesPeonLifecycle peonLifecycle = peonLifecycleFactory.build(task);

synchronized (tasks) {
KubernetesWorkItem workItem = tasks.get(task.getId());

if (workItem == null) {
throw new ISE("Task [%s] disappeared", task.getId());
}
KubernetesWorkItem workItem = tasks.get(task.getId());

if (workItem.isShutdownRequested()) {
throw new ISE("Task [%s] has been shut down", task.getId());
}
if (workItem == null) {
throw new ISE("Task [%s] disappeared", task.getId());
}

workItem.setKubernetesPeonLifecycle(peonLifecycle);
if (workItem.isShutdownRequested()) {
throw new ISE("Task [%s] has been shut down", task.getId());
}

workItem.setKubernetesPeonLifecycle(peonLifecycle);

try {
TaskStatus taskStatus;
if (run) {
Expand All @@ -202,9 +198,7 @@ protected TaskStatus doTask(Task task, boolean run)
}

finally {
synchronized (tasks) {
tasks.remove(task.getId());
}
tasks.remove(task.getId());
}
}

Expand Down Expand Up @@ -322,9 +316,7 @@ public Map<String, Long> getTotalTaskSlotCount()
@Override
public Collection<? extends TaskRunnerWorkItem> getKnownTasks()
{
synchronized (tasks) {
return Lists.newArrayList(tasks.values());
}
return Lists.newArrayList(tasks.values());
}


Expand Down Expand Up @@ -393,23 +385,19 @@ public void registerListener(TaskRunnerListener listener, Executor executor)
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
synchronized (tasks) {
return tasks.values()
.stream()
.filter(KubernetesWorkItem::isRunning)
.collect(Collectors.toList());
}
return tasks.values()
.stream()
.filter(KubernetesWorkItem::isRunning)
.collect(Collectors.toList());
}

@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
synchronized (tasks) {
return tasks.values()
.stream()
.filter(KubernetesWorkItem::isPending)
.collect(Collectors.toList());
}
return tasks.values()
.stream()
.filter(KubernetesWorkItem::isPending)
.collect(Collectors.toList());
}

@Nullable
Expand Down