Fix case where get_partial_current_state_deltas could return >100 rows#18960
Conversation
This fixes the case where every state delta in `current_state_delta_stream` has a count of 1, meaning `total` will be 100. Before this change, that would result in `clipped_stream_id = max_stream_id`, meaning we'd potentially pull out millions of rows.
reivilibre
left a comment
There was a problem hiding this comment.
nothing materially wrong with this, just some thoughts, what do you think?
Credit to @reivilibre for the idea.
Simplify the queries to just a single one.
|
Complement CI is failing due to a flaky test. Edit: And sytest appears to just be stalling... |
reivilibre
left a comment
There was a problem hiding this comment.
I mean.... it's weird if the tests don't catch this, probably a smell that we're missing a test
| # don't select toooo many. | ||
| sql = """ | ||
| SELECT stream_id, count(*) | ||
| SELECT stream_id, room_id, type, state_key, event_id, prev_event_id |
There was a problem hiding this comment.
afraid this doesn't work AFAICT, because stream_id is not unique, but we always want to process the entire state with the same stream_id (never process some of the rows but not others)
There was a problem hiding this comment.
So would this mean that if a group of state deltas with the same stream_id that was >100 items would cause this function to deadlock?
I suppose we should just return the whole group, even if it's larger than limit, in that case.
Edit: especially since the docstring states that if the list of returned state deltas is empty, then we are up to date. Potentially skipping over a lot of state deltas!
There was a problem hiding this comment.
I don't have memory of there being any deadlock here (in fact, given no concurrency primitives are involved — well except row locks? — it's hard to imagine this being on the table at all) but I think the problem was that a group with more than one row might not be covered within the LIMIT 100 batch, in which case we would advance past its stream_id but not have processed all the rows within that group.
Even a group of 2 state deltas would be enough to trigger this, if it was at the 99th position in the limit, if you see what I mean.
But you also make a good point that we can indeed have groups larger than 100 deltas, I think because of state resolution (both desirable resolutions and state resets?), essentially.
We now: 1. Group state deltas by `stream_id` and get their count 2. Count (in the DB) until we go over our limit. We then get the `stream_id` we could naively go up to, as well as the clamped `stream_id` which would keep us under our limit. The second query fetches rows up to and including the clamped `stream_id`. We also add a unit test that injects multiple state deltas with the same `stream_id`, which correctly failed when tested against the previous implementation in this PR.
…_get_partial-current_state_deltas_limit
Simplify the query and handle the case where a single state group is larger than the provided limit.
reivilibre
left a comment
There was a problem hiding this comment.
Looks correct, some small bits here and there but optional really
| # don't select toooo many. | ||
| sql = """ | ||
| SELECT stream_id, count(*) | ||
| SELECT stream_id, room_id, type, state_key, event_id, prev_event_id |
There was a problem hiding this comment.
I don't have memory of there being any deadlock here (in fact, given no concurrency primitives are involved — well except row locks? — it's hard to imagine this being on the table at all) but I think the problem was that a group with more than one row might not be covered within the LIMIT 100 batch, in which case we would advance past its stream_id but not have processed all the rows within that group.
Even a group of 2 state deltas would be enough to trigger this, if it was at the 99th position in the limit, if you see what I mean.
But you also make a good point that we can indeed have groups larger than 100 deltas, I think because of state resolution (both desirable resolutions and state resets?), essentially.
| included_rows = 0 | ||
| hit_limit = False | ||
| fetch_upto_stream_id = prev_stream_id | ||
|
|
||
| for stream_id, count in grouped_rows: | ||
| if included_rows + count <= limit: | ||
| included_rows += count | ||
| fetch_upto_stream_id = stream_id | ||
| else: | ||
| # Either we have already included some groups and adding | ||
| # this one would exceed the limit, or this is the first | ||
| # group and it alone exceeds the limit. | ||
| hit_limit = True | ||
| if included_rows == 0: | ||
| # Return the entire oversized group so that callers make | ||
| # progress (even though this may exceed `limit` rows). | ||
| fetch_upto_stream_id = stream_id | ||
| break |
There was a problem hiding this comment.
up to you, but you may or may not like this alternative statement of this logic:
| included_rows = 0 | |
| hit_limit = False | |
| fetch_upto_stream_id = prev_stream_id | |
| for stream_id, count in grouped_rows: | |
| if included_rows + count <= limit: | |
| included_rows += count | |
| fetch_upto_stream_id = stream_id | |
| else: | |
| # Either we have already included some groups and adding | |
| # this one would exceed the limit, or this is the first | |
| # group and it alone exceeds the limit. | |
| hit_limit = True | |
| if included_rows == 0: | |
| # Return the entire oversized group so that callers make | |
| # progress (even though this may exceed `limit` rows). | |
| fetch_upto_stream_id = stream_id | |
| break | |
| # Always retrieve the first group, at the bare minimum | |
| fetch_upto_stream_id, included_rows = grouped_rows[0] | |
| # Determine which other groups we can retrieve at the same time, | |
| # without blowing the budget | |
| for stream_id, count in grouped_rows[1:]: | |
| if included_rows + count > limit: | |
| break | |
| included_rows += count | |
| fetch_upto_stream_id = stream_id |
(then also remove the if block beneath, since we don't need it any longer)
(probably should put a type annotation on grouped_rows)
(I think hit_limit is now unnecessary and you can just use caught_up_with_stream = len(grouped_rows) < group_limit?)
Sorry if the indentation is wrong, why does this use a variable-width textbox? :D
There was a problem hiding this comment.
Ooo, yes, that's much cleaner. Thank you!
| def test_get_partial_current_state_deltas_does_not_deadlock(self) -> None: | ||
| """ | ||
| Tests that `get_partial_current_state_deltas` does not repeatedly return | ||
| zero entries due to the passed `limit` parameter being less than the | ||
| size of the next group of state deltas from the given `prev_stream_id`. |
There was a problem hiding this comment.
ah, I see, deadlock is not (or at least I don't think it is) the same as returning zero entries.
To me deadlock is when multiple 'parties' hold locks in such a way that all of them become dead whilst waiting on others to release a lock. Maybe easier illustrated with pseudocode:
process A {
lock(X);
// <---
lock(Y);
}
process B {
lock(Y);
// <---
lock(X);
}
If both processes make it to the commented arrow, neither can obtain the second lock, so both wait indefinitely and 'deadlock'.
(There is also 'livelock' where multiple 'parties' are spinning away, being active/alive whilst somehow conflicting with each other in a way that means they all fail to advance. I don't remember seeing this in real life yet, so I'm going to say it's more niche.)
There was a problem hiding this comment.
Good point on the terminology. Perhaps "an infinite loop" is most appropriate (aadd2d8)?
| # Call the function under test with a limit of 4. Without the limit, we would return | ||
| # 5 state deltas: | ||
| # | ||
| # C T T T T | ||
| # 1 2 3 4 5 | ||
| # | ||
| # C = m.room.create | ||
| # T = m.room.topic | ||
| # | ||
| # With the limit, we should return only the create event, as returning 4 | ||
| # state deltas would result in splitting a group: | ||
| # | ||
| # C T T T T | ||
| # 1 2 3 4 X |
There was a problem hiding this comment.
Copypaste that is stale for this example, I think :)
There was a problem hiding this comment.
Actually, this is relevant! But the description is a little off (they are m.room.name events, not topic events). Plus the commented out assertion below is definitely not supposed to be there. Looks like these tests were indeed still WIP.
Fixed up in 0712b31
Looks like this final test was still a bit wip. This commit finishes it up.
anoadragon453
left a comment
There was a problem hiding this comment.
Thanks for the review (and code suggestions) @reivilibre. Super helpful!
| # Call the function under test with a limit of 4. Without the limit, we would return | ||
| # 5 state deltas: | ||
| # | ||
| # C T T T T | ||
| # 1 2 3 4 5 | ||
| # | ||
| # C = m.room.create | ||
| # T = m.room.topic | ||
| # | ||
| # With the limit, we should return only the create event, as returning 4 | ||
| # state deltas would result in splitting a group: | ||
| # | ||
| # C T T T T | ||
| # 1 2 3 4 X |
There was a problem hiding this comment.
Actually, this is relevant! But the description is a little off (they are m.room.name events, not topic events). Plus the commented out assertion below is definitely not supposed to be there. Looks like these tests were indeed still WIP.
Fixed up in 0712b31
| def test_get_partial_current_state_deltas_does_not_deadlock(self) -> None: | ||
| """ | ||
| Tests that `get_partial_current_state_deltas` does not repeatedly return | ||
| zero entries due to the passed `limit` parameter being less than the | ||
| size of the next group of state deltas from the given `prev_stream_id`. |
There was a problem hiding this comment.
Good point on the terminology. Perhaps "an infinite loop" is most appropriate (aadd2d8)?
| included_rows = 0 | ||
| hit_limit = False | ||
| fetch_upto_stream_id = prev_stream_id | ||
|
|
||
| for stream_id, count in grouped_rows: | ||
| if included_rows + count <= limit: | ||
| included_rows += count | ||
| fetch_upto_stream_id = stream_id | ||
| else: | ||
| # Either we have already included some groups and adding | ||
| # this one would exceed the limit, or this is the first | ||
| # group and it alone exceeds the limit. | ||
| hit_limit = True | ||
| if included_rows == 0: | ||
| # Return the entire oversized group so that callers make | ||
| # progress (even though this may exceed `limit` rows). | ||
| fetch_upto_stream_id = stream_id | ||
| break |
There was a problem hiding this comment.
Ooo, yes, that's much cleaner. Thank you!
The team has decided to deprecate and stop publishing python wheels for MacOS. Synapse docker images will continue to work on MacOS, as will building Synapse from source (though note this requires a Rust compiler). Admins using the unstable [MSC2666](matrix-org/matrix-spec-proposals#2666) endpoint (`/_matrix/client/unstable/uk.half-shot.msc2666/user/mutual_rooms`), please check [the relevant section in the upgrade notes](https://github.com/element-hq/synapse/blob/develop/docs/upgrade.md#upgrading-to-v11440) as this release contains changes that disable that endpoint by default. No significant changes since 1.144.0rc1. Admins using the unstable [MSC2666](matrix-org/matrix-spec-proposals#2666) endpoint (`/_matrix/client/unstable/uk.half-shot.msc2666/user/mutual_rooms`), please check [the relevant section in the upgrade notes](https://github.com/element-hq/synapse/blob/develop/docs/upgrade.md#upgrading-to-v11440) as this release contains changes that disable that endpoint by default. - Add experimentatal implememntation of [MSC4380](matrix-org/matrix-spec-proposals#4380) (invite blocking). ([\#19203](#19203)) - Allow restarting delayed event timeouts on workers. ([\#19207](#19207)) - Fix a bug in the database function for fetching state deltas that could result in unnecessarily long query times. ([\#18960](#18960)) - Fix v12 rooms when running with `use_frozen_dicts: True`. ([\#19235](#19235)) - Fix bug where invalid `canonical_alias` content would return 500 instead of 400. ([\#19240](#19240)) - Fix bug where `Duration` was logged incorrectly. ([\#19267](#19267)) - Document in the `--config-path` help how multiple files are merged - by merging them shallowly. ([\#19243](#19243)) - Stop building release wheels for MacOS. ([\#19225](#19225)) - Improve event filtering for Simplified Sliding Sync. ([\#17782](#17782)) - Export `SYNAPSE_SUPPORTED_COMPLEMENT_TEST_PACKAGES` environment variable from `scripts-dev/complement.sh`. ([\#19208](#19208)) - Refactor `scripts-dev/complement.sh` logic to avoid `exit` to facilitate being able to source it from other scripts (composable). ([\#19209](#19209)) - Expire sliding sync connections that are too old or have too much pending data. ([\#19211](#19211)) - Require an experimental feature flag to be enabled in order for the unstable [MSC2666](matrix-org/matrix-spec-proposals#2666) endpoint (`/_matrix/client/unstable/uk.half-shot.msc2666/user/mutual_rooms`) to be available. ([\#19219](#19219)) - Prevent changelog check CI running on @dependabot's PRs even when a human has modified the branch. ([\#19220](#19220)) - Auto-fix trailing spaces in multi-line strings and comments when running the lint script. ([\#19221](#19221)) - Move towards using a dedicated `Duration` type. ([\#19223](#19223), [\#19229](#19229)) - Improve robustness of the SQL schema linting in CI. ([\#19224](#19224)) - Add log to determine whether clients are using `/messages` as expected. ([\#19226](#19226)) - Simplify README and add ESS Getting started section. ([\#19228](#19228), [\#19259](#19259)) - Add a unit test for ensuring associated refresh tokens are erased when a device is deleted. ([\#19230](#19230)) - Prompt user to consider adding future deprecations to the changelog in release script. ([\#19239](#19239)) - Fix check of the Rust compiled code being outdated when using source checkout and `.egg-info`. ([\#19251](#19251)) - Stop building macos wheels in CI pipeline. ([\#19263](#19263)) * Bump Swatinem/rust-cache from 2.8.1 to 2.8.2. ([\#19244](#19244)) * Bump actions/checkout from 5.0.0 to 6.0.0. ([\#19213](#19213)) * Bump actions/setup-go from 6.0.0 to 6.1.0. ([\#19214](#19214)) * Bump actions/setup-python from 6.0.0 to 6.1.0. ([\#19245](#19245)) * Bump attrs from 25.3.0 to 25.4.0. ([\#19215](#19215)) * Bump docker/metadata-action from 5.9.0 to 5.10.0. ([\#19246](#19246)) * Bump http from 1.3.1 to 1.4.0. ([\#19249](#19249)) * Bump pydantic from 2.12.4 to 2.12.5. ([\#19250](#19250)) * Bump pyopenssl from 25.1.0 to 25.3.0. ([\#19248](#19248)) * Bump rpds-py from 0.28.0 to 0.29.0. ([\#19216](#19216)) * Bump rpds-py from 0.29.0 to 0.30.0. ([\#19247](#19247)) * Bump sentry-sdk from 2.44.0 to 2.46.0. ([\#19218](#19218)) * Bump types-bleach from 6.2.0.20250809 to 6.3.0.20251115. ([\#19217](#19217)) * Bump types-jsonschema from 4.25.1.20250822 to 4.25.1.20251009. ([\#19252](#19252))
get_partial_current_state_deltasis supposed to return max 100 rows. We relied on this assumption in #18926, forgoing batching of state deltas. It turns out thatget_partial_current_state_deltascan potentially return millions of rows, and we saw this in production on element.io.This PR fixes the case where every state delta in
current_state_delta_streamhas a count of 1, meaningtotalwill be 100. Before this change, that would result inclipped_stream_id = max_stream_id, meaning we'd potentially pull out millions of rows.I haven't written a regression test for this and I'm not sure how to easily seedI was able to write a unit test without writing SQL directly.current_state_delta_streamwith the correct data. I suppose we could create a room and change the topic >100 times? Alternatively we could write directly to the table, but such a test would need to be updated if the schema changes.Pull Request Checklist
EventStoretoEventWorkerStore.".code blocks.