Skip to content

Multi-threads support for processing diff queues in Kubernetes Executor#26639

Closed
Dinghang wants to merge 5 commits intoapache:mainfrom
Dinghang:multi_threads_queue_process_in_k8s_executor
Closed

Multi-threads support for processing diff queues in Kubernetes Executor#26639
Dinghang wants to merge 5 commits intoapache:mainfrom
Dinghang:multi_threads_queue_process_in_k8s_executor

Conversation

@Dinghang
Copy link

@Dinghang Dinghang commented Sep 23, 2022

The workflow team from Pinterest has been always using Kubernetes Executor. We met the scheduler performance issue before since:

  • Kubernetes Executor is on the critical path of Scheduler heartbeat.
  • All the items in queues within the Kubernetes Executor are done sequentially. The process is not efficient.
  • Especially that processing task_queue will call k8s api to create pods, which is causing more delay.

Therefore, we implemented the multi-threads support for processing diff queues in Kubernetes Executor. Based on our observation, it's able to

  • reduce the time cost from minutes level to seconds level when syncing about several hundreds of items in one round.

Once this is agreed, then I can start adding unit test cases as well.

@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:Scheduler including HA (high availability) scheduler labels Sep 23, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Sep 23, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this resource_version is not needed here since it's not really used. The only place it's using it is the watcher process. But that's a different process. Info is not communicated. Therefore, it's safe to delete

@Acehaidrey
Copy link
Contributor

@jedcunningham @kaxil @potiuk hey team my team member Dinghang is doing his first PR and he is from Pinterest who built our backend integration to Kubernetes.
Want to get some help to review this PR as its a nice enhancement with low overhead

@Acehaidrey
Copy link
Contributor

Hi @jedcunningham @kaxil @potiuk @dstandish or anyone else. This PR has been open for some time and want to get some help to get this merged in. Is there concerns or anything to get the conversation here. Dinghang has worked hard to make the feature be foul proof and tested it out in multiple ways so want to see if there are any more asks.

Thank you, and looking forward to hearing back today team!

@potiuk potiuk force-pushed the multi_threads_queue_process_in_k8s_executor branch from dd48d81 to 1f5cd1a Compare November 11, 2022 00:19
@potiuk
Copy link
Member

potiuk commented Nov 11, 2022

I rebased it but it seems it needs rebase and re-push from the author - sorry for not looking at it before- been a little swamped and had some private errands and now I want to release a new provider's wave, but If you rebase @Dinghang I promise to take a look soon enough and ping others as this one touches core internals and K8S executor so it needs more than one pair of eyes.

@Dinghang
Copy link
Author

Dinghang commented Nov 11, 2022

I rebased it but it seems it needs rebase and re-push from the author - sorry for not looking at it before- been a little swamped and had some private errands and now I want to release a new provider's wave, but If you rebase @Dinghang I promise to take a look soon enough and ping others as this one touches core internals and K8S executor so it needs more than one pair of eyes.

Hi @potiuk I just rebased it. Thanks for reviewing this! take your time.

@Acehaidrey
Copy link
Contributor

Hi @potiuk any luck to get some eyes on this?

@potiuk
Copy link
Member

potiuk commented Nov 17, 2022

Few comments

  • first of all configuration values should come from config.yml - this is the source of truth for all configuration variables - they are used to generate docmentation and in a few other places - (pre-commits will update them where needed)

  • I do not think we need that many new config values - generally the less the better, and I see no reason why we would like to set them differently for different queues.

  • we generally need to add unit tests (but you already mentioned that you will)

Generally - it looks good, I see that as a nice optimization @dstandish and @ashb - would need to take a look at this as this is rather crucial part.

I am a bit surprised though we have not been doing something like that before - maybe there was a good reason why all the k8s pod creation happened sequentially. It seems like cool optimizations - Thread do not always optimize stuff as much as we think (due to GIL) but if we are using k8s API and create PODs and wait for it, then quite likely the latency introduced by sequential processing might build up, but also I wonder how often it happens that there are many entries to process in quick succession.

WDYT @ashb @dstandish ?

@Dinghang
Copy link
Author

Hi @potiuk , thanks for the review!

  • will update config within config.yml
  • about different configs, your suggestion makes sense, can change them to use only one config for all different queues.
  • Unit tests will be added along with ^.

Would also like to get some inputs from @ashb @dstandish before I make further changes. thanks

@Acehaidrey
Copy link
Contributor

@ashb @dstandish can we please get some eyes on here, I am sorry to tag, I'm sure all busy but also have some reasons to our company that we want this as a case study

@dstandish
Copy link
Contributor

Hi @Dinghang sorry for the delay. I think the idea looks great in principle and we should proceed with this. I'm no threading expert but it seems that using ThreadPoolExecutor could be better here than managing the threads individually. And I think Jarek's point about making sure all the config additions are really required is a good one.

Comment on lines +95 to +103
threads = []
quotient, remainder = divmod(batch_size, max_threads)
for i in range(max_threads):
sub_batch_size = quotient + 1 if i < remainder else quotient
t = Thread(target=process_method, args=[sub_batch_size])
threads.append(t)
t.start()
for t in threads:
t.join()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it's more complicated than necessary.

I just played around with threadpoolexecutor and it seems it can be a bit simpler. E.g. here's a simple example:

import time
from concurrent.futures import ThreadPoolExecutor


def print_and_sleep(x):
    print(x)
    time.sleep(0.25)

max_threads = 4
with ThreadPoolExecutor(max_threads) as pool:
    [pool.submit(print_and_sleep, x) for x in range(123)]

Copy link
Contributor

@dstandish dstandish Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the error handling work here, compared with the status quo?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok i guess we can do something like this to see what failed and handle appropriately:

import time
from concurrent.futures import ThreadPoolExecutor


def print_and_sleep(x):
    print(x)
    if x % 50 == 0:
        raise ValueError("hi")
    time.sleep(0.25)


max_threads = 4
with ThreadPoolExecutor(max_threads) as pool:
    futures = [(x, pool.submit(print_and_sleep, x)) for x in range(123)]

failures = []
for arg, f in futures:
    print(arg)
    if f.exception():
        failures.append((arg, f))

for arg, f in failures:
    print(f"Func print_and_sleep failed for arg {arg} with exception {f.exception()}")

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dstandish , thanks for the reply. The idea is the same. IIRC, I was using ThreadPoolExecutor at the very beginning and then met some issues in production with more workloads. Unfortunately, did not record the issue. But with the current implementation, there was no issue and it has more flexibility for us to do updates. So would like to still go with that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want to do this manually, at least i would separate the batch construction logic from the thread creation logic e.g. you could do this

def get_batches(
    queue_size=100,
    batch_size=10,
    max_threads=3,
):
    batch_size = min(batch_size or queue_size, queue_size)
    max_threads = min(max_threads, queue_size)
    quotient, remainder = divmod(batch_size, max_threads)
    for i in range(max_threads):
        sub_batch_size = quotient + 1 if i < remainder else quotient
        yield sub_batch_size

threads = []
for num in get_batches():
    t = Thread(target=process_method, args=[num])
    threads.append(t)
    t.start()

for t in threads:
    t.join()

or something

it's not obvious what this batch definition logic is supposed to do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and splitting means you can test that part of it pretty easily)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better yet...

why have fixed batches? why not just launch desired number of threads, and each thread can consume queue until the queue is consumed. no need to manage batches right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see... worker_pods_creation_batch_size was already there... it limits num to create each loop

Copy link
Contributor

@dstandish dstandish Dec 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you see the issue i'm thinking of here? since you are defining fixed sizes for each thread, one thread may finish more quickly than the others, so it could do more work if the others are taking longer per-item.

so it would be more efficient if each thread just drew from the same queue.

BUT the complication here is that you need to limit the total number of tasks processed because of that setting. i think one way to handle that is you could create a new queue object for each batch with all items taken from task_queue that you need to process in in the batch. but maybe this is not really gonna matter too much. and we needn't let perfect be enemy of good.

other thing i'll mention is.... k8s has support for async requests... not asyncio, but threads... it manages its own thread pool... you use by adding async_req=True to request. thread count is configured in client. but refactoring to this woud be much more involved so not suggesting it, but just want to mention.

anyway, i'll keep watching this one closely. i have been a bit busy over last month with holidays and illness and my personal projects but i'm excited that y'all are looking to get more involved and am committed to helping you see this one through and encouraging the any others coming in the future. thanks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dstandish thanks for the info. Most likely would like to keep the current implementation. About the "one thread may finish more quickly than the others". This could happen but it's not a problem since we are focusing on improving is for extreme case with hundreds or thousands items in the queue. Overall threads are making it more efficient.

multi_threads_queue_process(
queue_size=self.watcher_queue.qsize(),
queue_type='watcher',
process_method=self.process_watcher_queue,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the point of multithreading process_watcher_queue?

it does not appear that it makes any network calls.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dstandish , not sure what you meant network calls here. Here, process_watcher_queue is just a method to process items in the watcher queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

@dstandish dstandish Jan 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Dinghang @Acehaidrey still working on this? would love to get it merged

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @dstandish , sorry for the late reply. Was AFK due to some personal issues. Just got back. Will be addressing comments accordingly. Gonna ping you again once it's ready.

multi_threads_queue_process(
queue_size=self.result_queue.qsize(),
queue_type='result',
process_method=self.process_result_queue,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think process_result_queue is thread-safe. It goes into self._change_state where it mutates self.running and self.event_buffer

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hterik , that's a good point! both self.running and self.event_buffer are doing some atomic operations. AFAIK, they are thread-safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Today they might only use the [] and in operators, which in isolation are thread-safe, but tomorrow someone might add a for-loop or other side-effects, giving you "changed size during iteration" error. It is a lot of complexity to keep in mind if this can be modified from several threads at the same time.

I'm not 100% sure but i believe this piece would potentially break,

for ti_key in list(self.event_buffer.keys()):
if ti_key.dag_id in dag_ids:
cleared_events[ti_key] = self.event_buffer.pop(ti_key)
, if some thread is modifying event_buffer while one thread is iterating the loop, because it takes a snapshot of the keys before starting to pop them off.

My suggestion is to isolate the multithreaded operations to more pure functions, then return their results on a queue which is consumed in the same main thread as is handling it today. Otherwise this could benefit from more documentation, on which member-properties that must be protected by locks, or what type of operations that are allowed in each function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hterik , I'll think about it. either the way you mentioned above or a lock. But no matter which way we go, need to also verify that in our prod env for while first. Then the PR readiness delay is expected.

@hterik
Copy link
Contributor

hterik commented Dec 20, 2022

Just checking if performance is still an issue after #26778 (2.4.2) ? It improved scheduler blockage from KubernetesExecutor significantly for us. Without the increased complexity of multithreading.

I have another suggestion in #28071 that will do similar type of avoidance during task adoptation.

@Dinghang
Copy link
Author

Just checking if performance is still an issue after #26778 (2.4.2) ? It improved scheduler blockage from KubernetesExecutor significantly for us. Without the increased complexity of multithreading.

I have another suggestion in #28071 that will do similar type of avoidance during task adoptation.

Hi @hterik , the one you shared can enhance the performance. However, it might still not be able to resolve the performance when there are really a lot of workloads. Multithreads can help with that in this case.

@Acehaidrey
Copy link
Contributor

Hi @hterik are you able to take a look at Dinghang's comments today. We actually have internal goals at Pinterest of committing back to see if we can dedicate some engineering time into these efforts and want to make sure we can get it in if all conditions/concerns met.

@Dinghang
Copy link
Author

Hi Team, gonna get some time addressing comments this week. Expect to reply back next week.

while True:
"""processing result queue"""
self.last_resource_version = defaultdict(lambda: "0")
multi_threads_queue_process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi_threads_queue_process will start and stop multiple threads for each sync() tick. I'm a bit out of touch on the real world overhead of this but I've always been taught that starting a thread comes with a lot of overhead.

Can one use a fixed ThreadPool instead? This would also make the batching and queueing logic a lot easier, as dstandish suggested above

@dimberman
Copy link
Contributor

Hi @Dinghang, has there been any progress on this? Seems like a really cool feature :)

@potiuk
Copy link
Member

potiuk commented Feb 20, 2023

@Dinghang :) ?

@github-actions
Copy link

github-actions bot commented Apr 9, 2023

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 9, 2023
@github-actions github-actions bot closed this Apr 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler provider:cncf-kubernetes Kubernetes (k8s) provider related issues stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants