Skip to content

Conversation

@potiuk
Copy link
Member

@potiuk potiuk commented Jul 31, 2023

In RedisPubSubSensor subscription has been done in constructor, which was pretty wrong - for example it means that when scheduler parses the sensor, it involves subscribing to the messages and commmunication with redis DB.

This PR moves subscription to "poke()" method, which is executed on worker instead.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@potiuk
Copy link
Member Author

potiuk commented Jul 31, 2023

revealed after #32978

Copy link
Contributor

@ferruzzi ferruzzi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks reasonable, assuming that failing CI is just a bit of flake.

@potiuk potiuk force-pushed the move-redis-subscribe-to-poke-in-redis-sensor branch from 29fbbb5 to e1bd4d8 Compare July 31, 2023 21:58
@potiuk
Copy link
Member Author

potiuk commented Jul 31, 2023

Actually the integration test needed to be fixed too @ferruzzi -> the problem was that the way it was constructed, the test actually depended on first subscribing in init(), then publishing and then poke(). I had to update it a bit and make sure that we do not re-subscribe if we already run poke() in the same active sensor task. Now it should be fine.

@ferruzzi
Copy link
Contributor

Sounds good, I'll take another look.

@ferruzzi
Copy link
Contributor

Interesting. It's a shame subscribe() didn't return a value you could have used instead of creating an new field, but oh well.

re-approved; CI is green

In RedisPubSubSensor subscription has been done in constructor,
which was pretty wrong - for example it means that when scheduler
parses the sensor, it involves subscribing to the messages and
commmunication with redis DB.

This PR moves subscription to "poke()" method, which is executed
on worker instead.
@potiuk potiuk force-pushed the move-redis-subscribe-to-poke-in-redis-sensor branch from e1bd4d8 to 1f2b72b Compare July 31, 2023 22:51
@ferruzzi
Copy link
Contributor

Not that my approval means much at this point, but Approved? Again?

@potiuk potiuk merged commit 17a3dd4 into apache:main Aug 1, 2023
@potiuk potiuk deleted the move-redis-subscribe-to-poke-in-redis-sensor branch August 1, 2023 00:06
potiuk added a commit to potiuk/airflow that referenced this pull request Aug 5, 2023
The fix in apache#32984 moved redis hook to be initialized in a
cached property but "subscription" call has been moved to the
poke - which would cause multiple subscribe calls in regular mode
of the sensor (i.e. not poke-reschedule mode).

This one fixes it by subscribing only once - when the cached
property gets initialized.

Fixes: apache#33138
potiuk added a commit that referenced this pull request Aug 5, 2023
The fix in #32984 moved redis hook to be initialized in a
cached property but "subscription" call has been moved to the
poke - which would cause multiple subscribe calls in regular mode
of the sensor (i.e. not poke-reschedule mode).

This one fixes it by subscribing only once - when the cached
property gets initialized.

Fixes: #33138
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants