Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,33 @@
import org.apache.druid.java.util.common.ISE;

import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

public class KubernetesWorkItem extends TaskRunnerWorkItem
{
private final Task task;
private final KubernetesPeonLifecycle kubernetesPeonLifecycle;

private final AtomicBoolean isShutdown = new AtomicBoolean(false);

public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture, KubernetesPeonLifecycle kubernetesPeonLifecycle)
{
super(task.getId(), statusFuture);
this.task = task;
this.kubernetesPeonLifecycle = kubernetesPeonLifecycle;
}

protected synchronized void shutdown()
/**
* Shuts down this work item. Subsequent calls to this method return immediately.
*/
protected void shutdown()
{
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown();
if (isShutdown.compareAndSet(false, true)) {
synchronized (this) {
this.kubernetesPeonLifecycle.startWatchingLogs();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we even need to be calling startWatchingLogs() here? Kind of off topic for the PR. But part of the reason shutdown is slow and can even hang, is because of the LogWatch being unhealthy. #18444 was implemented to put a time limit on log persist for saving off task logs. In that code path we are actually writing the logs out somewhere. I guess I don't see how the call here is useful at all if we are trying to just shut the task down?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree, the only apparent effect of calling startWatchingLogs() is to initialize KubernetesPeonLifecycle.logWatch field which would be initialized anyway via join() -> finally -> saveLogs() -> doSaveLogs().

We can explore that in a separate PR.

this.kubernetesPeonLifecycle.shutdown();
}
}
}

protected boolean isPending()
Expand Down