Retry Experimental Federation Speedup#9908
Conversation
richvdh
left a comment
There was a problem hiding this comment.
looks good. a couple of suggestions.
| events_by_room.setdefault(event.room_id, []).append(event) | ||
|
|
||
| await make_deferred_yieldable( | ||
| # concurrently process room events, to allow parallelization through db queries |
There was a problem hiding this comment.
| # concurrently process room events, to allow parallelization through db queries | |
| # concurrently process rooms, to allow parallelization through db queries |
| nested_events_and_dests: List[ | ||
| List[Tuple[EventBase, Collection[str]]] | ||
| ] = await make_deferred_yieldable( | ||
| defer.gatherResults( | ||
| [ | ||
| run_in_background(handle_room_events, evs) | ||
| run_in_background( | ||
| get_federatable_events_and_destinations, evs | ||
| ) | ||
| for evs in events_by_room.values() | ||
| ], | ||
| consumeErrors=True, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
there's a concurrently_execute in synapse.util.async_helpers which takes care of the logcontext boilerplate whilst also applying a limit to the number of concurrent operations (which would probably be a good thing here). I think you may as well use it while you're changing things.
| events_by_room: Dict[str, List[EventBase]] = {} | ||
| for event in events: | ||
| events_by_room.setdefault(event.room_id, []).append(event) | ||
|
|
||
| await make_deferred_yieldable( | ||
| # concurrently process room events, to allow parallelization through db queries | ||
| nested_events_and_dests: List[ | ||
| List[Tuple[EventBase, Collection[str]]] | ||
| ] = await make_deferred_yieldable( | ||
| defer.gatherResults( | ||
| [ | ||
| run_in_background(handle_room_events, evs) | ||
| run_in_background( | ||
| get_federatable_events_and_destinations, evs | ||
| ) | ||
| for evs in events_by_room.values() | ||
| ], | ||
| consumeErrors=True, | ||
| ) | ||
| ) | ||
|
|
||
| # flatten list | ||
| events_and_dests = list( | ||
| itertools.chain.from_iterable(nested_events_and_dests) | ||
| ) |
There was a problem hiding this comment.
I think it would be worth moving all this into a function with input events and output events_and_dests, so as to limit the scope of the intermediate variables events_by_room and nested_events_and_dests.
|
Hi @ShadowJonathan, are you able to continue working on this? 🙂 |
|
I need to rebase and think about what these changes exactly entail, but i'll put it on draft until i got more time for it (i dont expect much in the upcoming months) I still want to work on it, though, just not right now |
|
Putting this in closed state after further discussion, ill reopen it once i have more time |
Fixes #9863
Re-introduces #9702, but adds parallelisation per #9863 (comment)
Pull Request Checklist
EventStoretoEventWorkerStore.".code blocks.Signed-off-by: Jonathan de Jong <jonathan@automatia.nl>