From 9726ba56a0cac3a2afd53ab9df9153d7eac13868 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 25 May 2022 20:33:16 +0100 Subject: [PATCH 01/11] Set the depth of join events returned from `/make_join` Signed-off-by: Sean Quah --- internal/federation/handle.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/federation/handle.go b/internal/federation/handle.go index 4133e79e..fd4d0dbd 100644 --- a/internal/federation/handle.go +++ b/internal/federation/handle.go @@ -45,6 +45,7 @@ func MakeJoinRequestsHandler(s *Server, w http.ResponseWriter, req *http.Request Type: "m.room.member", StateKey: &userID, PrevEvents: []string{room.Timeline[len(room.Timeline)-1].EventID()}, + Depth: room.Timeline[len(room.Timeline)-1].Depth() + 1, } err := builder.SetContent(map[string]interface{}{"membership": gomatrixserverlib.Join}) if err != nil { From 6729788709dbd75966745401f31bda1a76b46ae3 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 25 May 2022 20:33:56 +0100 Subject: [PATCH 02/11] Include other joined homeservers in `servers_in_room` `/send_join` response Signed-off-by: Sean Quah --- internal/federation/handle.go | 6 +++--- internal/federation/server_room.go | 27 +++++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/internal/federation/handle.go b/internal/federation/handle.go index fd4d0dbd..457a44a3 100644 --- a/internal/federation/handle.go +++ b/internal/federation/handle.go @@ -129,12 +129,12 @@ func SendJoinRequestsHandler(s *Server, w http.ResponseWriter, req *http.Request authEvents := room.AuthChainForEvents(stateEvents) + // get servers in room *before* the join event + serversInRoom := room.ServersInRoom() + // insert the join event into the room state room.AddEvent(event) - // servers in room: just us. TODO(faster_joins): this may not be correct - serversInRoom := []string{s.serverName} - // return state and auth chain b, err := json.Marshal(gomatrixserverlib.RespSendJoin{ Origin: gomatrixserverlib.ServerName(s.serverName), diff --git a/internal/federation/server_room.go b/internal/federation/server_room.go index ae906278..cff95a2c 100644 --- a/internal/federation/server_room.go +++ b/internal/federation/server_room.go @@ -134,6 +134,33 @@ func (r *ServerRoom) MustHaveMembershipForUser(t *testing.T, userID, wantMembers } } +// ServersInRoom gets all servers currently joined to the room +func (r *ServerRoom) ServersInRoom() (servers []string) { + serverSet := make(map[string]struct{}) + + for _, ev := range r.State { + if ev.Type() != "m.room.member" { + continue + } + membership, err := ev.Membership() + if err != nil || membership != "join" { + continue + } + _, server, err := gomatrixserverlib.SplitID('@', *ev.StateKey()) + if err != nil { + continue + } + + serverSet[string(server)] = struct{}{} + } + + for server := range serverSet { + servers = append(servers, server) + } + + return +} + func initialPowerLevelsContent(roomCreator string) (c gomatrixserverlib.PowerLevelContent) { c.Defaults() c.Events = map[string]int64{ From cadb31d0e7c57e0af57c74589395a247ebf968d5 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 25 May 2022 20:35:12 +0100 Subject: [PATCH 03/11] Add `Server.Room(roomID)` function Signed-off-by: Sean Quah --- internal/federation/server.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/federation/server.go b/internal/federation/server.go index b9759fe1..6e06a1a3 100644 --- a/internal/federation/server.go +++ b/internal/federation/server.go @@ -119,6 +119,11 @@ func NewServer(t *testing.T, deployment *docker.Deployment, opts ...func(*Server return srv } +// Room returns the given room +func (s *Server) Room(roomID string) *ServerRoom { + return s.rooms[roomID] +} + // Return the server name of this federation server. Only valid AFTER calling Listen() - doing so // before will produce an error. // From 8a928a2beb2937669b5716376fe14d5222e4d431 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 25 May 2022 20:35:34 +0100 Subject: [PATCH 04/11] Update `AuthChainForEvents` to return all auth events recursively Signed-off-by: Sean Quah --- internal/federation/server_room.go | 32 +++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/internal/federation/server_room.go b/internal/federation/server_room.go index cff95a2c..779d85be 100644 --- a/internal/federation/server_room.go +++ b/internal/federation/server_room.go @@ -97,24 +97,38 @@ func (r *ServerRoom) AuthChain() (chain []*gomatrixserverlib.Event) { return r.AuthChainForEvents(r.AllCurrentState()) } -// AuthChainForEvents returns all auth events for all events in the given state TODO: recursively +// AuthChainForEvents returns all auth events for all events in the given state func (r *ServerRoom) AuthChainForEvents(events []*gomatrixserverlib.Event) (chain []*gomatrixserverlib.Event) { chainMap := make(map[string]bool) - // get all the auth event IDs - for _, ev := range events { + + // build a map of all events in the room + // Timeline and State contain different sets of events, so check them both. + eventsByID := map[string]*gomatrixserverlib.Event{} + for _, ev := range r.Timeline { + eventsByID[ev.EventID()] = ev + } + for _, ev := range r.State { + eventsByID[ev.EventID()] = ev + } + + // a queue of events whose auth events are to be included in the auth chain + queue := []*gomatrixserverlib.Event{} + queue = append(queue, events...) + + // get all the auth events recursively + // we extend the "queue" as we go along + for i := 0; i < len(queue); i++ { + ev := queue[i] for _, evID := range ev.AuthEventIDs() { if chainMap[evID] { continue } chainMap[evID] = true + chain = append(chain, eventsByID[evID]) + queue = append(queue, eventsByID[evID]) } } - // find them in the timeline - for _, tev := range r.Timeline { - if chainMap[tev.EventID()] { - chain = append(chain, tev) - } - } + return } From e64fcc0b915648dcbd4059fa895bfbfcc5ea4725 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 25 May 2022 20:36:17 +0100 Subject: [PATCH 05/11] Test re-syncing state from other servers listed in `/send_join` reply Test that a homeserver can re-sync state from other homeservers in a partial-state room if the server it joined off is not working. Signed-off-by: Sean Quah --- ...federation_room_join_partial_state_test.go | 98 +++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 45ec6a74..9bd98d15 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -182,6 +182,104 @@ func TestPartialStateJoin(t *testing.T) { }) } }) + + // test that a partial-state join can fall back to other homeservers when re-syncing + // partial state. + t.Run("PartialStateJoinSyncsUsingOtherHomeservers", func(t *testing.T) { + // set up 3 homeservers: hs1, hs2 and complement + deployment := Deploy(t, b.BlueprintFederationTwoLocalOneRemote) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + charlie := deployment.Client(t, "hs2", "@charlie:hs2") + + // create a public room + roomID := alice.CreateRoom(t, map[string]interface{}{ + "preset": "public_chat", + }) + + // create the complement homeserver + server := federation.NewServer(t, deployment, + federation.HandleKeyRequests(), + federation.HandlePartialStateMakeSendJoinRequests(), + federation.HandleEventRequests(), + federation.HandleTransactionRequests( + func(e *gomatrixserverlib.Event) { + t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) + }, + // hs1 may send us presence when alice syncs + func(e gomatrixserverlib.EDU) {}, + ), + ) + cancelListener := server.Listen() + defer cancelListener() + + // join complement to the public room + server.MustJoinRoom(t, deployment, "hs1", roomID, server.UserID("bob")) + + // we expect a /state_ids request from hs2 after it joins the room + // we will respond to the request with garbage + requestReceivedWaiter := NewWaiter() + sendResponseWaiter := NewWaiter() + server.Mux().Handle( + fmt.Sprintf("/_matrix/federation/v1/state_ids/%s", roomID), + http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + queryParams := req.URL.Query() + t.Logf("Incoming state_ids request for event %s in room %s", queryParams["event_id"], roomID) + requestReceivedWaiter.Finish() + sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state request") + t.Logf("Replying to /state_ids request with invalid response") + + w.WriteHeader(200) + + if _, err := w.Write([]byte("{}")); err != nil { + t.Errorf("Error writing to request: %v", err) + } + }), + ).Methods("GET") + + // join charlie on hs2 to the room, via the complement homeserver + charlie.JoinRoom(t, roomID, []string{server.ServerName()}) + + // and let hs1 know that charlie has joined, + // otherwise hs1 will refuse /state_ids requests + room := server.Room(roomID) + member_event := room.CurrentState("m.room.member", charlie.UserID).JSON() + server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{member_event}, nil) + alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(charlie.UserID, roomID)) + + // wait until hs2 starts syncing state + requestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") + + syncResponseChan := make(chan gjson.Result) + defer close(syncResponseChan) + go func() { + response, _ := charlie.MustSync(t, client.SyncReq{}) + syncResponseChan <- response + }() + + // the client-side requests should still be waiting + select { + case <-syncResponseChan: + t.Fatalf("hs2 sync completed before state resync complete") + default: + } + + // reply to hs2 with a bogus /state_ids response + sendResponseWaiter.Finish() + + // charlie's /sync request should now complete, with the new room + var syncRes gjson.Result + select { + case <-time.After(1 * time.Second): + t.Fatalf("hs2 /sync request request did not complete") + case syncRes = <-syncResponseChan: + } + + roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(roomID)) + if !roomRes.Exists() { + t.Fatalf("hs2 /sync completed without join to new room\n") + } + }) } // buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq From f005c62aef96e7ebc5db4c48d212095154e7bae8 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 13:21:57 +0100 Subject: [PATCH 06/11] Revert "Add `Server.Room(roomID)` function" This reverts commit cadb31d0e7c57e0af57c74589395a247ebf968d5. --- internal/federation/server.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/federation/server.go b/internal/federation/server.go index 6e06a1a3..b9759fe1 100644 --- a/internal/federation/server.go +++ b/internal/federation/server.go @@ -119,11 +119,6 @@ func NewServer(t *testing.T, deployment *docker.Deployment, opts ...func(*Server return srv } -// Room returns the given room -func (s *Server) Room(roomID string) *ServerRoom { - return s.rooms[roomID] -} - // Return the server name of this federation server. Only valid AFTER calling Listen() - doing so // before will produce an error. // From e66dd5abdfacb207b8b47806c5df298bfab41d26 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 13:27:50 +0100 Subject: [PATCH 07/11] Use the `ServerRoom` returned by `MustJoinRoom` --- tests/federation_room_join_partial_state_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 9bd98d15..1e70e368 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -214,7 +214,7 @@ func TestPartialStateJoin(t *testing.T) { defer cancelListener() // join complement to the public room - server.MustJoinRoom(t, deployment, "hs1", roomID, server.UserID("bob")) + room := server.MustJoinRoom(t, deployment, "hs1", roomID, server.UserID("bob")) // we expect a /state_ids request from hs2 after it joins the room // we will respond to the request with garbage @@ -242,7 +242,6 @@ func TestPartialStateJoin(t *testing.T) { // and let hs1 know that charlie has joined, // otherwise hs1 will refuse /state_ids requests - room := server.Room(roomID) member_event := room.CurrentState("m.room.member", charlie.UserID).JSON() server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{member_event}, nil) alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(charlie.UserID, roomID)) From 2a2e70b36e532ae9b7e846b07d78c6d62335aefb Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 13:28:23 +0100 Subject: [PATCH 08/11] Rename `Waiter`s, following the naming in `partialStateJoinResult` --- tests/federation_room_join_partial_state_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 1e70e368..b3089fdb 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -218,15 +218,15 @@ func TestPartialStateJoin(t *testing.T) { // we expect a /state_ids request from hs2 after it joins the room // we will respond to the request with garbage - requestReceivedWaiter := NewWaiter() - sendResponseWaiter := NewWaiter() + fedStateIdsRequestReceivedWaiter := NewWaiter() + fedStateIdsSendResponseWaiter := NewWaiter() server.Mux().Handle( fmt.Sprintf("/_matrix/federation/v1/state_ids/%s", roomID), http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { queryParams := req.URL.Query() t.Logf("Incoming state_ids request for event %s in room %s", queryParams["event_id"], roomID) - requestReceivedWaiter.Finish() - sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state request") + fedStateIdsRequestReceivedWaiter.Finish() + fedStateIdsSendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state request") t.Logf("Replying to /state_ids request with invalid response") w.WriteHeader(200) @@ -247,7 +247,7 @@ func TestPartialStateJoin(t *testing.T) { alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(charlie.UserID, roomID)) // wait until hs2 starts syncing state - requestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") + fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") syncResponseChan := make(chan gjson.Result) defer close(syncResponseChan) @@ -264,7 +264,7 @@ func TestPartialStateJoin(t *testing.T) { } // reply to hs2 with a bogus /state_ids response - sendResponseWaiter.Finish() + fedStateIdsSendResponseWaiter.Finish() // charlie's /sync request should now complete, with the new room var syncRes gjson.Result From 2c7a938e8072910761c8b21ad45bb976a6bcd456 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 13:40:25 +0100 Subject: [PATCH 09/11] Revert "Set the depth of join events returned from `/make_join`" This reverts commit 9726ba56a0cac3a2afd53ab9df9153d7eac13868. --- internal/federation/handle.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/federation/handle.go b/internal/federation/handle.go index 457a44a3..e00affb1 100644 --- a/internal/federation/handle.go +++ b/internal/federation/handle.go @@ -45,7 +45,6 @@ func MakeJoinRequestsHandler(s *Server, w http.ResponseWriter, req *http.Request Type: "m.room.member", StateKey: &userID, PrevEvents: []string{room.Timeline[len(room.Timeline)-1].EventID()}, - Depth: room.Timeline[len(room.Timeline)-1].Depth() + 1, } err := builder.SetContent(map[string]interface{}{"membership": gomatrixserverlib.Join}) if err != nil { From 7e104f4beaba572b7b1269e05207fa772cc11c4e Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Tue, 31 May 2022 12:46:41 +0100 Subject: [PATCH 10/11] Update tests/federation_room_join_partial_state_test.go Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- tests/federation_room_join_partial_state_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index b3089fdb..89ccfc2a 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -207,7 +207,7 @@ func TestPartialStateJoin(t *testing.T) { t.Fatalf("Received unexpected PDU: %s", string(e.JSON())) }, // hs1 may send us presence when alice syncs - func(e gomatrixserverlib.EDU) {}, + nil, ), ) cancelListener := server.Listen() From 2fdc71eae72c243615df3136008fc00bce4bf3cd Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 31 May 2022 13:04:52 +0100 Subject: [PATCH 11/11] Address PR feedback Rename bob to david and drop the timeout message. --- tests/federation_room_join_partial_state_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 89ccfc2a..3a4d6909 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -214,7 +214,7 @@ func TestPartialStateJoin(t *testing.T) { defer cancelListener() // join complement to the public room - room := server.MustJoinRoom(t, deployment, "hs1", roomID, server.UserID("bob")) + room := server.MustJoinRoom(t, deployment, "hs1", roomID, server.UserID("david")) // we expect a /state_ids request from hs2 after it joins the room // we will respond to the request with garbage @@ -226,7 +226,7 @@ func TestPartialStateJoin(t *testing.T) { queryParams := req.URL.Query() t.Logf("Incoming state_ids request for event %s in room %s", queryParams["event_id"], roomID) fedStateIdsRequestReceivedWaiter.Finish() - fedStateIdsSendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state request") + fedStateIdsSendResponseWaiter.Wait(t, 60*time.Second) t.Logf("Replying to /state_ids request with invalid response") w.WriteHeader(200)