diff --git a/internal/client/client.go b/internal/client/client.go index cacab9dc..6800abae 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -682,6 +682,22 @@ func SyncTimelineHasEventID(roomID string, eventID string) SyncCheckOpt { }) } +// Check that the state section for `roomID` has an event which passes the check function. +// Note that the state section of a sync response only contains the change in state up to the start +// of the timeline and will not contain the entire state of the room for incremental or +// `lazy_load_members` syncs. +func SyncStateHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt { + return func(clientUserID string, topLevelSyncJSON gjson.Result) error { + err := loopArray( + topLevelSyncJSON, "rooms.join."+GjsonEscape(roomID)+".state.events", check, + ) + if err == nil { + return nil + } + return fmt.Errorf("SyncStateHas(%s): %s", roomID, err) + } +} + func SyncEphemeralHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt { return func(clientUserID string, topLevelSyncJSON gjson.Result) error { err := loopArray( diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 0bbf57c3..b4e16310 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -65,6 +65,17 @@ func TestPartialStateJoin(t *testing.T) { return serverRoom } + // getSyncToken gets the latest sync token + getSyncToken := func(t *testing.T, alice *client.CSAPI) string { + _, syncToken := alice.MustSync(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + TimeoutMillis: "0", + }, + ) + return syncToken + } + // test that a regular /sync request made during a partial-state /send_join // request blocks until the state is correctly synced. t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) { @@ -326,6 +337,169 @@ func TestPartialStateJoin(t *testing.T) { testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC) }) + // initial sync must return memberships of event senders even when they aren't present in the + // partial room state. + t.Run("Lazy-loading initial sync includes remote memberships during partial state join", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + // the HS will make an /event_auth request for the event + federation.HandleEventAuthRequests()(server) + + // derek sends a message into the room. + event := psjResult.CreateMessageEvent(t, "derek", nil) + t.Logf("Derek created event with ID %s", event.EventID()) + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) + + // wait for the homeserver to persist the event. + awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, event.EventID()) + + // do a lazy-loading initial sync. + syncRes, _ := alice.MustSync(t, + client.SyncReq{ + Since: "", + Filter: buildLazyLoadingSyncFilter(nil), + }, + ) + + err := client.SyncStateHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == event.Sender() + })(alice.UserID, syncRes) + if err != nil { + t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event.Sender(), err) + } + }) + + // gappy sync must return memberships of event senders even when they aren't present in the + // partial room state. + t.Run("Lazy-loading gappy sync includes remote memberships during partial state join", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + syncToken = alice.MustSyncUntil(t, + client.SyncReq{ + Since: syncToken, + Filter: buildLazyLoadingSyncFilter(nil), + }, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + // the HS will make an /event_auth request for the event + federation.HandleEventAuthRequests()(server) + + // derek sends two messages into the room. + event1 := psjResult.CreateMessageEvent(t, "derek", nil) + event2 := psjResult.CreateMessageEvent(t, "derek", nil) + t.Logf("Derek created event 1 with ID %s", event1.EventID()) + t.Logf("Derek created event 2 with ID %s", event2.EventID()) + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event1.JSON(), event2.JSON()}, nil) + + // wait for the homeserver to persist the event. + awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, event2.EventID()) + + // do a gappy sync which only picks up the second message. + syncRes, _ := alice.MustSync(t, + client.SyncReq{ + Since: syncToken, + Filter: buildLazyLoadingSyncFilter(map[string]interface{}{ + "limit": 1, + }), + }, + ) + + if !syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID) + ".timeline.limited").Bool() { + t.Errorf("/sync response was not gappy") + } + + err := client.SyncTimelineHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == event1.EventID() + })(alice.UserID, syncRes) + if err == nil { + t.Errorf("gappy /sync returned the first event unexpectedly") + } + + err = client.SyncTimelineHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("event_id").Str == event2.EventID() + })(alice.UserID, syncRes) + if err != nil { + t.Errorf("Did not find event 2 in lazy-loading /sync response: %s", err) + } + + err = client.SyncStateHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == event2.Sender() + })(alice.UserID, syncRes) + if err != nil { + t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event2.Sender(), err) + } + }) + + // incremental sync must return memberships of event senders even when they aren't present in + // the partial room state. + t.Run("Lazy-loading incremental sync includes remote memberships during partial state join", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + syncToken := getSyncToken(t, alice) + + server := createTestServer(t, deployment) + cancel := server.Listen() + defer cancel() + serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t)) + psjResult := beginPartialStateJoin(t, server, serverRoom, alice) + defer psjResult.Destroy() + + syncToken = alice.MustSyncUntil(t, + client.SyncReq{ + Since: syncToken, + Filter: buildLazyLoadingSyncFilter(nil), + }, + client.SyncJoinedTo(alice.UserID, serverRoom.RoomID), + ) + + // the HS will make an /event_auth request for the event + federation.HandleEventAuthRequests()(server) + + // derek sends a message into the room. + event := psjResult.CreateMessageEvent(t, "derek", nil) + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) + t.Logf("Derek created event with ID %s", event.EventID()) + + // wait for the homeserver to persist the event. + awaitEventArrival(t, time.Second, alice, serverRoom.RoomID, event.EventID()) + + // do an incremental sync. + syncRes, _ := alice.MustSync(t, + client.SyncReq{ + Since: syncToken, + Filter: buildLazyLoadingSyncFilter(nil), + }, + ) + + err := client.SyncStateHas(serverRoom.RoomID, func(ev gjson.Result) bool { + return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == event.Sender() + })(alice.UserID, syncRes) + if err != nil { + t.Errorf("Did not find %s's m.room.member event in lazy-loading /sync response: %s", event.Sender(), err) + } + }) + // a request to (client-side) /members?at= should block until the (federation) /state request completes // TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path t.Run("MembersRequestBlocksDuringPartialStateJoin", func(t *testing.T) {