Skip to content

Conversation

@rskew
Copy link
Contributor

@rskew rskew commented Jan 21, 2025

A stream creator might pass a queue_response or topic response when calling create_stream(), so it can be notified as frames are processed.
If a stream exits in error, the stream creator will be notified, but if the stream exits without error, the stream creator has no way of knowing that the stream has ended.
This change posts to the queue and/or topic when the stream is destroyed with no error condition.

Implements #38

@rskew rskew marked this pull request as draft February 4, 2025 00:22
@rskew rskew force-pushed the rs/notify-stream-creater-of-stream-stop branch from 160121f to 5d87b88 Compare February 18, 2025 01:49
@rskew rskew force-pushed the rs/notify-stream-creater-of-stream-stop branch 2 times, most recently from 1ebe042 to e4e807d Compare March 24, 2025 04:49
@rskew rskew force-pushed the rs/notify-stream-creater-of-stream-stop branch from e4e807d to e99a7b6 Compare June 30, 2025 16:43
element_name, stream, stream_event, diagnostic,
in_destroy_stream=True))
if stream.state == StreamState.ERROR:
break
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This break prevents stop_stream being called on downstream elements if an upstream stop_stream raises

break
destroy_stream_state = StreamState.ERROR
elif destroy_stream_state == StreamState.ERROR:
stream.state = StreamState.ERROR
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feed the error state back into the next stop_stream call, so that element knows the stream has stopped in error and can cleanup/finalise appropriately

stream.lock.release()
self.destroy_stream(get_stream_id(), use_thread_local=False)
stream.state = StreamState.ERROR
self.destroy_stream(get_stream_id(), use_thread_local=False, diagnostic=diagnostic)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing diagnostic here allows the stop-stream message put on the response_queueindestroy_streamto include the diagnostic message fromstart_stream`

@rskew rskew marked this pull request as ready for review July 1, 2025 03:03
PipelineImpl posts to the listening response_queue and/or
response_topic when the stream is destroyed.

A stream creator might pass a queue_response or topic
response when calling create_stream(),
so it can be notified as frames are processed.
If a stream exits due to an error in process_frame then these
listeners will be notified, but if the stream exits without
error then the listeners will not previously be notified.
@rskew rskew force-pushed the rs/notify-stream-creater-of-stream-stop branch from e99a7b6 to a64da07 Compare October 6, 2025 07:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant