From 001a280973d56c09487d4d9c6cb09b89448661a9 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 10 Aug 2022 16:45:02 +0100 Subject: [PATCH 1/6] Add SyncStateHas* check functions --- internal/client/client.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/internal/client/client.go b/internal/client/client.go index cae7cac6..6fe17efb 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -682,6 +682,34 @@ func SyncTimelineHasEventID(roomID string, eventID string) SyncCheckOpt { }) } +// Check that the state for `roomID` has an event which passes the check function. +func SyncStateHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt { + return func(clientUserID string, topLevelSyncJSON gjson.Result) error { + err := loopArray( + topLevelSyncJSON, "rooms.join."+GjsonEscape(roomID)+".state.events", check, + ) + if err == nil { + return nil + } + return fmt.Errorf("SyncStateHas(%s): %s", roomID, err) + } +} + +// Check that the state for `roomID` has an event which matches the event ID. +func SyncStateHasEventID(roomID string, eventID string) SyncCheckOpt { + return SyncStateHas(roomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == eventID + }) +} + +// Check that the state for `roomID` has an event which matches the state key. +func SyncStateHasStateKey(roomID string, eventType string, stateKey string) SyncCheckOpt { + return SyncStateHas(roomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == eventType && + ev.Get("state_key").Str == stateKey + }) +} + // Checks that `userID` gets invited to `roomID`. // // This checks different parts of the /sync response depending on the client making the request. From fd8f216436e3fee4f55c4af1c28bb8c9e7524971 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 10 Aug 2022 17:42:53 +0100 Subject: [PATCH 2/6] Add getSyncToken helper function --- tests/federation_room_join_partial_state_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 4cef4a46..0a85bc9c 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -65,6 +65,17 @@ func TestPartialStateJoin(t *testing.T) { return serverRoom } + // getSyncToken gets the latest sync token + getSyncToken := func(t *testing.T, alice *client.CSAPI) string { + _, syncToken := alice.MustSync(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + TimeoutMillis: "0", + }, + ) + return syncToken + } + // test that a regular /sync request made during a partial-state /send_join // request blocks until the state is correctly synced. t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) { From dc6349b05f64ab164087fb6c4b54d62883b908c4 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 10 Aug 2022 16:51:11 +0100 Subject: [PATCH 3/6] Add lazy-loading `/sync` tests for faster room joins Test that lazy-loading `/sync`s do not block when receiving events from remote senders. Also test that lazy-loading `/sync`s return the appropriate memberships for event senders. --- ...federation_room_join_partial_state_test.go | 149 ++++++++++++++++++ 1 file changed, 149 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 0a85bc9c..2c8412a2 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -337,6 +337,155 @@ func TestPartialStateJoin(t *testing.T) { testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC) }) + // initial sync must return memberships of event senders even when they aren't present in the + // partial room state. + t.Run("Lazy-loading initial sync includes remote memberships during partial state join", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + // the HS will make an /event_auth request for the event + federation.HandleEventAuthRequests()(server) + + // derek sends a message into the room. + event := psjResult.CreateMessageEvent(t, "derek", nil) + t.Logf("Derek created event with ID %s", event.EventID()) + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) + + // wait for the homeserver to persist the event. + awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, event.EventID()) + + // do a lazy-loading initial sync. + syncRes, _ := alice.MustSync(t, + client.SyncReq{ + Since: "", + Filter: buildLazyLoadingSyncFilter(nil), + }, + ) + + err := client.SyncStateHasStateKey(serverRoom.RoomID, "m.room.member", event.Sender())(alice.UserID, syncRes) + if err != nil { + t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event.Sender(), err) + } + }) + + // gappy sync must return memberships of event senders even when they aren't present in the + // partial room state. + t.Run("Lazy-loading gappy sync includes remote memberships during partial state join", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + syncToken = alice.MustSyncUntil(t, + client.SyncReq{ + Since: syncToken, + Filter: buildLazyLoadingSyncFilter(nil), + }, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + // the HS will make an /event_auth request for the event + federation.HandleEventAuthRequests()(server) + + // derek sends two messages into the room. + event1 := psjResult.CreateMessageEvent(t, "derek", nil) + event2 := psjResult.CreateMessageEvent(t, "derek", nil) + t.Logf("Derek created event 1 with ID %s", event1.EventID()) + t.Logf("Derek created event 2 with ID %s", event2.EventID()) + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event1.JSON(), event2.JSON()}, nil) + + // wait for the homeserver to persist the event. + awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, event2.EventID()) + + // do a gappy sync which only picks up the second message. + syncRes, _ := alice.MustSync(t, + client.SyncReq{ + Since: syncToken, + Filter: buildLazyLoadingSyncFilter(map[string]interface{}{ + "limit": 1, + }), + }, + ) + + err := client.SyncTimelineHasEventID(serverRoom.RoomID, event1.EventID())(alice.UserID, syncRes) + if err == nil { + t.Errorf("gappy /sync returned the first event unexpectedly") + } + + err = client.SyncTimelineHasEventID(serverRoom.RoomID, event2.EventID())(alice.UserID, syncRes) + if err != nil { + t.Errorf("Did not find event 2 in lazy-loading /sync response: %s", err) + } + + err = client.SyncStateHasStateKey(serverRoom.RoomID, "m.room.member", event2.Sender())(alice.UserID, syncRes) + if err != nil { + t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event2.Sender(), err) + } + }) + + // incremental sync must return memberships of event senders even when they aren't present in + // the partial room state. + t.Run("Lazy-loading incremental sync includes remote memberships during partial state join", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + syncToken = alice.MustSyncUntil(t, + client.SyncReq{ + Since: syncToken, + Filter: buildLazyLoadingSyncFilter(nil), + }, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + // the HS will make an /event_auth request for the event + federation.HandleEventAuthRequests()(server) + + // derek sends a message into the room. + event := psjResult.CreateMessageEvent(t, "derek", nil) + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) + t.Logf("Derek created event with ID %s", event.EventID()) + + // wait for the homeserver to persist the event. + awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, event.EventID()) + + // do an incremental sync. + syncRes, _ := alice.MustSync(t, + client.SyncReq{ + Since: syncToken, + Filter: buildLazyLoadingSyncFilter(nil), + }, + ) + + err := client.SyncStateHasStateKey(serverRoom.RoomID, "m.room.member", event.Sender())(alice.UserID, syncRes) + if err != nil { + t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event.Sender(), err) + } + }) + // a request to (client-side) /members?at= should block until the (federation) /state request completes // TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path t.Run("MembersRequestBlocksDuringPartialStateJoin", func(t *testing.T) { From fd1916ff0737eb34b3141e145a8ca9ec2b7578bc Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 17 Aug 2022 15:33:29 +0100 Subject: [PATCH 4/6] fixup: Remove SyncStateHasStateKey and usage of SyncTimelineHasEventID --- internal/client/client.go | 20 ++++--------------- ...federation_room_join_partial_state_test.go | 20 ++++++++++++++----- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 6fe17efb..1e118afb 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -682,7 +682,10 @@ func SyncTimelineHasEventID(roomID string, eventID string) SyncCheckOpt { }) } -// Check that the state for `roomID` has an event which passes the check function. +// Check that the state section for `roomID` has an event which passes the check function. +// Note that the state section of a sync response only contains the state at the start of the +// timeline and will not necessarily contain the entire state of the room, depending on the type of +// sync. func SyncStateHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt { return func(clientUserID string, topLevelSyncJSON gjson.Result) error { err := loopArray( @@ -695,21 +698,6 @@ func SyncStateHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt { } } -// Check that the state for `roomID` has an event which matches the event ID. -func SyncStateHasEventID(roomID string, eventID string) SyncCheckOpt { - return SyncStateHas(roomID, func(ev gjson.Result) bool { - return ev.Get("event_id").Str == eventID - }) -} - -// Check that the state for `roomID` has an event which matches the state key. -func SyncStateHasStateKey(roomID string, eventType string, stateKey string) SyncCheckOpt { - return SyncStateHas(roomID, func(ev gjson.Result) bool { - return ev.Get("type").Str == eventType && - ev.Get("state_key").Str == stateKey - }) -} - // Checks that `userID` gets invited to `roomID`. // // This checks different parts of the /sync response depending on the client making the request. diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 2c8412a2..7f86beda 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -370,7 +370,9 @@ func TestPartialStateJoin(t *testing.T) { }, ) - err := client.SyncStateHasStateKey(serverRoom.RoomID, "m.room.member", event.Sender())(alice.UserID, syncRes) + err := client.SyncStateHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == event.Sender() + })(alice.UserID, syncRes) if err != nil { t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event.Sender(), err) } @@ -422,17 +424,23 @@ func TestPartialStateJoin(t *testing.T) { }, ) - err := client.SyncTimelineHasEventID(serverRoom.RoomID, event1.EventID())(alice.UserID, syncRes) + err := client.SyncTimelineHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == event1.EventID() + })(alice.UserID, syncRes) if err == nil { t.Errorf("gappy /sync returned the first event unexpectedly") } - err = client.SyncTimelineHasEventID(serverRoom.RoomID, event2.EventID())(alice.UserID, syncRes) + err = client.SyncTimelineHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == event2.EventID() + })(alice.UserID, syncRes) if err != nil { t.Errorf("Did not find event 2 in lazy-loading /sync response: %s", err) } - err = client.SyncStateHasStateKey(serverRoom.RoomID, "m.room.member", event2.Sender())(alice.UserID, syncRes) + err = client.SyncStateHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == event2.Sender() + })(alice.UserID, syncRes) if err != nil { t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event2.Sender(), err) } @@ -480,7 +488,9 @@ func TestPartialStateJoin(t *testing.T) { }, ) - err := client.SyncStateHasStateKey(serverRoom.RoomID, "m.room.member", event.Sender())(alice.UserID, syncRes) + err := client.SyncStateHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == event.Sender() + })(alice.UserID, syncRes) if err != nil { t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event.Sender(), err) } From 6a5f481ab2032f45d6b7b68ea7180f8c2fae2408 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 17 Aug 2022 15:54:40 +0100 Subject: [PATCH 5/6] Test for timeline.limited flag in gappy sync --- tests/federation_room_join_partial_state_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 8adb62c9..b4e16310 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -424,6 +424,10 @@ func TestPartialStateJoin(t *testing.T) { }, ) + if !syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID) + ".timeline.limited").Bool() { + t.Errorf("/sync response was not gappy") + } + err := client.SyncTimelineHas(serverRoom.RoomID, func(ev gjson.Result) bool { return ev.Get("event_id").Str == event1.EventID() })(alice.UserID, syncRes) From 98c358bd45c23d7cd52317947f1c1f3eb6a7e601 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 2 Sep 2022 13:35:14 +0100 Subject: [PATCH 6/6] Update comment to match #440 --- internal/client/client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index f5ef362b..6800abae 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -683,9 +683,9 @@ func SyncTimelineHasEventID(roomID string, eventID string) SyncCheckOpt { } // Check that the state section for `roomID` has an event which passes the check function. -// Note that the state section of a sync response only contains the state at the start of the -// timeline and will not necessarily contain the entire state of the room, depending on the type of -// sync. +// Note that the state section of a sync response only contains the change in state up to the start +// of the timeline and will not contain the entire state of the room for incremental or +// `lazy_load_members` syncs. func SyncStateHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt { return func(clientUserID string, topLevelSyncJSON gjson.Result) error { err := loopArray(