Skip to content

Conversation

@uranusjr
Copy link
Member

This breaks the loops sending callbacks to the multiprocessing pipe into size 100 batches, and call DagFileProcessorAgent.heartbeat() after each pipe (which would call Pipe.recv()) to consume the pipe. This avoids the pipe from becoming full, which would make Pipe.send() block and deadlocking the process.

Pipe.send() is called in two code paths, (interestingly) represented exactly by the two py-spy traces available in #7935.

The way I do this is pretty naive, but represents the direction I think the issue should be resolved. I don’t really understand what the database calls do in _do_scheduling and _process_executor_events, and therefore have no idea if it’s OK to call self.processor_agent.heartbeat() interleaving those database calls (previously heartbeat() is only called after all those database calls are done).

Resolves #7935? There’s actually another separate issue described in it regarding the Redis worker being deadlocked. But this is no longer an issue according to @ashb (#7935 (comment)), and indeed all reports on that one is against 1.10.x, so I’m putting it off (and honestly I’m not sure how that one should be handled; that may need to involve some Redis internals).


Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

This breaks the loops sending callbacks to the multiprocessing pipe into
100-size batches, and call DagFileProcessorAgent.heartbeat() to consume
the pipe between the batches. This avoids the pipe from becoming full,
which would make Pipe.send() block and deadlocking the process.
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Mar 26, 2021

self.processor_agent.send_callback_to_execute(request)

if i % CALLBACK_SEND_BATCH_SIZE == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I don't think this will help actually.

This block/line 1253 doesn't get called that often, so I suspect what is happening is that the processor_manager side of the pipe is full, so trying to send even a single byte might block until the other end reads, but it can't read because it is also trying to write.

And trying to heartbeat every time before writing would be slow!.

🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option could be for send_callback_to_execute to store in a queue internally in the ProcessorAgent, and only send inside heartbeat after first reading?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(SLA callback requests also have this same problem.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be easier to change the other side - https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L698 that is the only place that side sends anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option:

>>> p1, p2 = multiprocessing.Pipe()
>>> fcntl.fcntl(p1.fileno(), fcntl.F_SETFL, os.O_NONBLOCK | fcntl.fcntl(p1.fileno(), fcntl.F_GETFL)) 
>>> p1.send('a' * 1024000) # Simulate something that would block
BlockingIOError: [Errno 11] Resource temporarily unavailable

We could set the socket in DagFileManager (the receiving side) to non-blocking, that way if the send would fail we could catch it, and queue it there, and then go and poll again.?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by “queue it there, and then go and poll again”? Making the child connection non-blocking sounds great (I wondered briefly but couldn’t find found), but the polling loop seems to have quite a few side effects before that line, so we can’t just put the thing back and try again later. Does it mean to add a queue (list) on DagFileProcessorManager to store DagParsingStat instances when they fail to send, and add a block in the beginning of the poll loop to check and resend them first?

@uranusjr uranusjr closed this Apr 1, 2021
@uranusjr uranusjr deleted the multiprocessing-pipe-batch branch May 28, 2021 06:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler

Projects

None yet

Development

Successfully merging this pull request may close these issues.

scheduler gets stuck without a trace

2 participants