Skip to content
160 changes: 135 additions & 25 deletions tests/federation_room_join_partial_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,111 @@ func TestPartialStateJoin(t *testing.T) {
t.Errorf("Did not find derek's m.room.member event in gappy /sync response: %s", err)
}
})

// regression test for https://github.com/matrix-org/synapse/issues/13001
//
// There was an edge case where, if we initially receive lots of events as outliers,
// and they then get de-outliered as partial state events, we would get stuck in
// an infinite loop of de-partial-stating.
t.Run("Resync completes even when events arrive before their prev_events", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

psjResult := beginPartialStateJoin(t, deployment, alice)
defer psjResult.Destroy()

// Alice has now joined the room, and the server is syncing the state in the background.

// utility function to wait for a given event to arrive at the remote server.
// This works simply by polling /event until we get a 200.

// here's the first event which we *ought* to un-partial-state, but won't
lateEvent := psjResult.CreateMessageEvent(t, "charlie", nil)

// next, we want to create 100 outliers. So, charlie creates 100 state events, and
// then persuades the system under test to create a backwards extremity using those events as
// part of the room state.
outliers := make([]*gomatrixserverlib.Event, 100)
outlierEventIDs := make([]string, len(outliers))
for i := range outliers {
body := fmt.Sprintf("outlier event %d", i)
outliers[i] = psjResult.Server.MustCreateEvent(t, psjResult.ServerRoom, b.Event{
Type: "outlier_state",
Sender: psjResult.Server.UserID("charlie"),
StateKey: b.Ptr(fmt.Sprintf("state_%d", i)),
Content: map[string]interface{}{"body": body},
})
psjResult.ServerRoom.AddEvent(outliers[i])
outlierEventIDs[i] = outliers[i].EventID()
}
t.Logf("Created outliers: %s ... %s", outliers[0].EventID(), outliers[len(outliers)-1].EventID())

// a couple of regular timeline events to pull in the outliers... Note that these are persisted with *full*
// state rather than becoming partial state events.
timelineEvent1 := psjResult.CreateMessageEvent(t, "charlie", nil)
timelineEvent2 := psjResult.CreateMessageEvent(t, "charlie", nil)

// dedicated get_missing_event handler for timelineEvent2.
// we grudgingly return a single event.
handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom,
[]string{timelineEvent2.EventID()}, []*gomatrixserverlib.Event{timelineEvent1},
)

// dedicated state_ids and state handlers for timelineEvent1's prev event (ie, the last outlier event)
handleStateIdsRequests(t, psjResult.Server, psjResult.ServerRoom, outliers[len(outliers)-1].EventID(),
psjResult.ServerRoom.AllCurrentState(), nil, nil)
handleStateRequests(t, psjResult.Server, psjResult.ServerRoom, outliers[len(outliers)-1].EventID(),
psjResult.ServerRoom.AllCurrentState(), nil, nil)

// now, send over the most recent event, which will make the server get_missing_events
// (we will send timelineEvent1), and then request state (we will send all the outliers).
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{timelineEvent2.JSON()}, nil)

t.Logf("Charlie sent timeline event 2")
// wait for it to become visible, which implies that all the outliers have been pulled in.
awaitEventArrival(t, time.Second, alice, psjResult.ServerRoom.RoomID, timelineEvent2.EventID())

// now we send over all the other events in the gap.
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lateEvent.JSON()}, nil)
t.Logf("Charlie sent late event")

for i := 0; i < len(outliers); {
var transactionEvents []json.RawMessage
// a transaction can contain max 50 events
for j := i; j < i+50 && j < len(outliers); j++ {
transactionEvents = append(transactionEvents, outliers[j].JSON())
}
psjResult.Server.MustSendTransaction(t, deployment, "hs1", transactionEvents, nil)
t.Logf("Charlie sent %d ex-outliers", len(transactionEvents))
i += len(transactionEvents)
}

// wait for the last outlier to arrive
awaitEventArrival(t, 10*time.Second, alice, psjResult.ServerRoom.RoomID, outliers[len(outliers)-1].EventID())

// release the federation /state response
psjResult.FinishStateRequest()

// alice should be able to sync the room. We can't use SyncJoinedTo here because that looks for the
// membership event in the response (which we won't see, because all of the outlier events).
// instead let's just check for the presence of the room in the timeline
alice.MustSyncUntil(t,
client.SyncReq{},
func(clientUserID string, topLevelSyncJSON gjson.Result) error {
key := "rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID) + ".timeline.events"
array := topLevelSyncJSON.Get(key)
if !array.Exists() {
return fmt.Errorf("Key %s does not exist", key)
}
if !array.IsArray() {
return fmt.Errorf("Key %s exists but it isn't an array", key)
}
return nil
},
)
t.Logf("Alice successfully synced")
})
}

// test reception of an event over federation during a resync
Expand All @@ -576,31 +681,7 @@ func testReceiveEventDuringPartialStateJoin(
// send the event to the homeserver
psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil)

/* TODO: check that a lazy-loading sync can see the event. Currently this doesn't work, because /sync blocks.
* https://github.com/matrix-org/synapse/issues/13146
alice.MustSyncUntil(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
},
client.SyncTimelineHasEventID(psjResult.ServerRoom.RoomID, event.EventID()),
)
*/

// still, Alice should be able to see the event with an /event request. We might have to try it a few times.
alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "event", event.EventID()},
client.WithRetryUntil(time.Second, func(res *http.Response) bool {
if res.StatusCode == 200 {
return true
}
eventResBody := client.ParseJSON(t, res)
if res.StatusCode == 404 && gjson.GetBytes(eventResBody, "errcode").String() == "M_NOT_FOUND" {
return false
}
t.Fatalf("GET /event failed with %d: %s", res.StatusCode, string(eventResBody))
return false
}),
)
t.Logf("Successfully fetched received event %s", event.EventID())
awaitEventArrival(t, time.Second, alice, psjResult.ServerRoom.RoomID, event.EventID())

// fire off a /state_ids request for the last event.
// it must either:
Expand Down Expand Up @@ -665,6 +746,35 @@ func testReceiveEventDuringPartialStateJoin(
must.CheckOffAll(t, gotState, expectedState)
}

// awaitEventArrival waits for alice to be able to see a given event
func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, roomID string, eventID string) {
/* TODO: check that a lazy-loading sync can see the event. Currently this doesn't work, because /sync blocks.
* https://github.com/matrix-org/synapse/issues/13146
alice.MustSyncUntil(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(nil),
},
client.SyncTimelineHasEventID(roomID, eventID),
)
*/

// still, Alice should be able to see the event with an /event request. We might have to try it a few times.
alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "event", eventID},
client.WithRetryUntil(timeout, func(res *http.Response) bool {
if res.StatusCode == 200 {
return true
}
eventResBody := client.ParseJSON(t, res)
if res.StatusCode == 404 && gjson.GetBytes(eventResBody, "errcode").String() == "M_NOT_FOUND" {
return false
}
t.Fatalf("GET /event failed with %d: %s", res.StatusCode, string(eventResBody))
return false
}),
)
t.Logf("Alice successfully received event %s", eventID)
}

// buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq
func buildLazyLoadingSyncFilter(timelineOptions map[string]interface{}) string {
timelineFilter := map[string]interface{}{
Expand Down