Skip to content

Conversation

@rskew
Copy link
Contributor

@rskew rskew commented Sep 8, 2025

When an exception is thrown in the frame generator, the stream lock was not being released.

This PR adds a test showing the problem, and the fix.

The test should pass, but reverting the changes to pipeline.py will cause the test to fail.

@jonochang
Copy link
Contributor

@rskew , when I was looking into this, i suspected that the try / catch around the frame_generator on

needed a finally to release the lock.

I noticed your PR looks to release in the destroy stream. Did you consider releasing the lock in _create_frames_generator funciton?

@rskew rskew changed the title Jc/fix stream lock bug Fix deadlock when an error occurs in the frame_generator Sep 8, 2025
@rskew rskew force-pushed the jc/fix-stream-lock-bug branch from 28b0a23 to e5e722f Compare September 8, 2025 23:23
@rskew
Copy link
Contributor Author

rskew commented Sep 8, 2025

@jonochang The lock is released in _process_stream_event, which is called here:

stream.set_state(self.pipeline._process_stream_event(

which when stream_event is ERROR, directly calls destroy_stream here, also releasing the lock:

if stream.lock._in_use:
stream.lock.release()
self.destroy_stream(get_stream_id(), use_thread_local=False)

The bug was in destroy_stream which did not release the lock that it takes when use_thread_local is False

@rskew
Copy link
Contributor Author

rskew commented Sep 8, 2025

@jonochang Although note that in our testing using https://github.com/silverpond/aiko_services we saw slightly different behaviour due to also using the commits from #42 However the bug is the same

rskew and others added 20 commits September 9, 2025 16:26
Example graph:
   __________
  /   \      \
 A     B ---- C --->
  \___/______/
has syntax in a pipeline definition:
  "graph": [
    "(A B (A.a_out_1: b_in_1 A.a_out_2: b_in_2) C (A.a_out_1: c_in_1 B.b_out_1: c_in_2 A.a_out_2: c_in_3))"
  ],
Note that output names must be fully-qualified, e.g. "B.b_out_1" instead
of "b_out_1". This is due to the graph traversal not yet handling edges
defined between B and C in the example graph, only between A and B, and
between A and C.
PipelineImpl posts to the listening response_queue and/or
response_topic when the stream is destroyed.

A stream creator might pass a queue_response or topic
response when calling create_stream(),
so it can be notified as frames are processed.
If a stream exits due to an error in process_frame then these
listeners will be notified, but if the stream exits without
error then the listeners will not previously be notified.
destroy_stream() in an error condition

Currently when an error is raised, _process_stream_event() will call
destroy_stream() directly so that the stream is immediately terminated and
cleaned up.
However, _process_stream_event() releases stream.lock before calling
destroy_stream(), allowing another thread to update stream.state before
destroy_stream() can stop and clean-up the stream, meaning that
stream.state cannot be used to signal that an error condition has occurred.
…es' start_stream() method to False, making the use of create_frames() the default
Copy link
Owner

@geekscape geekscape left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your fix.
I've made this change (just the fix) and pushed to master.

Since that change broke two of the unit tests, I've made a further change, which has also been pushed to master.

I have not yet included the unit test associated with the PR#45 fix, because it sounded like there is still a problem with that test to be resolved.

raise RuntimeError("Simulated frame generator exception - this should cause unreleased lock!")

def process_frame(self, stream, **kwargs) -> Tuple[aiko.StreamEvent, dict]:
self.logger.warning(f"Processin frame {stream.frame_id}")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Processin" --> "Processing"

@rskew
Copy link
Contributor Author

rskew commented Oct 9, 2025

Thanks @geekscape
The test is a bit clunky, but it does show that the lock is released. I'll leave it to you to update the test to your preferred code style and then integrate it or not.

@jonochang jonochang force-pushed the jc/fix-stream-lock-bug branch from 7d54176 to 239f6cb Compare October 15, 2025 23:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants