Skip to content

k8s-based-ingestion: Wait for task lifecycles to enter RUNNING state before returning from KubernetesTaskRunner.start#17446

Merged
georgew5656 merged 5 commits intoapache:masterfrom
georgew5656:waitForStart
Nov 8, 2024
Merged

k8s-based-ingestion: Wait for task lifecycles to enter RUNNING state before returning from KubernetesTaskRunner.start#17446
georgew5656 merged 5 commits intoapache:masterfrom
georgew5656:waitForStart

Conversation

@georgew5656
Copy link
Copy Markdown
Contributor

Description

It's possible for the KubenretesTaskRunner.start method to return before the threads in the exec pool running (KubernetesPeonLifecycle.join) have finished gathering information about kubernetes jobs. This can be a problem because other services on the overlord (like supervisors) expect the KubenretesTaskRunner to have information about tasks once it has returned from start (for example each task's location).

This diff adds a wait in the start() method that attempts to wait for all the tasks that have been discovered to go into RUNNING state. This state indicates the KubernetesTaskRunner knows all the information that it needs about a task.

Release note

Bugfix that helps mitigate unexpected behavior when running k8s based ingestion during overlord restarts.

Key changed/added classes in this PR
  • KubernetesTaskRunner
  • KubernetesWorkItem
  • KubernetesPeonLifecycle

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

* @throws IllegalStateException
*/
protected synchronized TaskStatus join(long timeout) throws IllegalStateException
protected synchronized TaskStatus join(long timeout, SettableFuture<Boolean> taskActiveStatusFuture) throws IllegalStateException
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an odd pattern; it's more typical for a method to return ListenableFuture<T>. If the caller needs to chain that onto a SettableFuture<T>, the caller should attach a callback that sets the settable future.

Please also add javadoc explaining what the returned future means.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess it would make more sense to maybe split this up into a joinUntilStart method that calls back into the wait method, i can rewrite it a bit

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm doing it this way kinda requires the entire lifecycle logic to be rewritten, i think i have a simpler semi-hacky way of accomplishing the same thing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i added a function that just returns a future containing a boolean with info on whether the task started running successfully. in order to get this to work i made KubernetesPeonLifecycle a regular argument for KubernetesWorkItem (instead of setting it with setKubernetesPeonLifecycle afterwards). this seems fine to me becuase KubernetesPeonLifecycle is just a simple POJO until you call run or join on it, i don't really see why it needs to be instantiated in doTask instead of run/joinAsync.

@georgew5656 georgew5656 requested a review from gianm November 7, 2024 00:16
throw e;
}
finally {
if (!taskStartedSuccessfullyFuture.isDone()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskStartedSuccessfullyFuture is not set to true in run as far as I can tell. is that intentional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run always calls into join (line 140) so it'll get set there

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see, OK.

}
catch (Exception e) {
final long numInitialized =
tasks.values().stream().filter(item -> item.getPeonLifeycle().getTaskStartedSuccessfullyFuture().isDone()).count();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just looking for how many of the futures finished. Should it be looking for ones that finish and eval to true? I am wondering what the boolean is for.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything that recovers from failed join, so I suppose there's three cases?

  • future is done and true — the task was joined
  • future is done and false — error joining the task, i suppose it's never going to be joined? at this point should we mark it failed?
  • future is not yet done — the task is still being joined async

Copy link
Copy Markdown
Contributor Author

@georgew5656 georgew5656 Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think looking for ones that eval to true makes sense, updated this.

on the second comment, the main logic of the task lifecycle is performed by the exec thread.
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem( task, exec.submit(() -> joinTask(task)), peonLifecycleFactory.build( task, this::emitTaskStateMetrics ) ));

so if the task fails to join the KubernetesWorkItem.getResult() future will complete as failed and the taskQueue will mark the task as failed and shut it down, i don't believe we need to do this in the start() method too.

for the new "did task start successfully future"
i updated the logic so the catch blocks in run/join catch any errors during run/join and mark the future as failed if it didn't start properly. also added a test for the scenario

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks. I missed that there was a different future that did have a callback for failure.

@georgew5656 georgew5656 requested a review from gianm November 7, 2024 17:40
@georgew5656
Copy link
Copy Markdown
Contributor Author

i don't think the failing unit test is related so i'm going to go ahead and merge this

@georgew5656 georgew5656 merged commit 5764183 into apache:master Nov 8, 2024
jtuglu1 pushed a commit to jtuglu1/druid that referenced this pull request Nov 20, 2024
…before returning from KubernetesTaskRunner.start (apache#17446)

* Add a wait on start() for task lifecycle to go into running

* handle exceptions

* Fix logging messages

* Don't pass in the settable future as a arg

* add some unit tests
@adarshsanjeev adarshsanjeev added this to the 32.0.0 milestone Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants