Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
321 changes: 221 additions & 100 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httputil"
Expand All @@ -22,12 +23,50 @@ import (
// See functions starting with `With...` in this package for more info.
type RequestOpt func(req *http.Request)

// SyncCheckOpt is a functional option for use with MustSyncUntil which should return <nil> if
// the response satisfies the check, else return a human friendly error.
// The result object is the entire /sync response from this request.
type SyncCheckOpt func(clientUserID string, topLevelSyncJSON gjson.Result) error

// SyncReq contains all the /sync request configuration options. The empty struct `SyncReq{}` is valid
// which will do a full /sync due to lack of a since token.
type SyncReq struct {
// A point in time to continue a sync from. This should be the next_batch token returned by an
// earlier call to this endpoint.
Since string
// The ID of a filter created using the filter API or a filter JSON object encoded as a string.
// The server will detect whether it is an ID or a JSON object by whether the first character is
// a "{" open brace. Passing the JSON inline is best suited to one off requests. Creating a
// filter using the filter API is recommended for clients that reuse the same filter multiple
// times, for example in long poll requests.
Filter string
// Controls whether to include the full state for all rooms the user is a member of.
// If this is set to true, then all state events will be returned, even if since is non-empty.
// The timeline will still be limited by the since parameter. In this case, the timeout parameter
// will be ignored and the query will return immediately, possibly with an empty timeline.
// If false, and since is non-empty, only state which has changed since the point indicated by
// since will be returned.
// By default, this is false.
FullState bool
// Controls whether the client is automatically marked as online by polling this API. If this
// parameter is omitted then the client is automatically marked as online when it uses this API.
// Otherwise if the parameter is set to “offline” then the client is not marked as being online
// when it uses this API. When set to “unavailable”, the client is marked as being idle.
// One of: [offline online unavailable].
SetPresence string
// The maximum time to wait, in milliseconds, before returning this request. If no events
// (or other data) become available before this time elapses, the server will return a response
// with empty fields.
// By default, this is 1000 for Complement testing.
TimeoutMillis string // string for easier conversion to query params
}

type CSAPI struct {
UserID string
AccessToken string
BaseURL string
Client *http.Client
// how long are we willing to wait for SyncUntil.... calls
// how long are we willing to wait for MustSyncUntil.... calls
SyncUntilTimeout time.Duration
// True to enable verbose logging
Debug bool
Expand Down Expand Up @@ -122,124 +161,124 @@ func (c *CSAPI) SendEventSynced(t *testing.T, roomID string, e b.Event) string {
body := ParseJSON(t, res)
eventID := GetJSONFieldStr(t, body, "event_id")
t.Logf("SendEventSynced waiting for event ID %s", eventID)
c.SyncUntilTimelineHas(t, roomID, func(r gjson.Result) bool {
c.MustSyncUntil(t, SyncReq{}, SyncTimelineHas(roomID, func(r gjson.Result) bool {
return r.Get("event_id").Str == eventID
})
}))
return eventID
}

// SyncUntilTimelineHas is a wrapper around `SyncUntil`.
// It blocks and continually calls `/sync` until
// - we have joined the given room
// - we see an event in the room for which the `check` function returns True
// If the `check` function fails the test, the failing event will be automatically logged.
// Will time out after CSAPI.SyncUntilTimeout.
// Perform a single /sync request with the given request options. To sync until something happens,
// see `MustSyncUntil`.
//
// Returns the `next_batch` token from the last /sync response. This can be passed as
// `since` to sync from this point forward only.
func (c *CSAPI) SyncUntilTimelineHas(t *testing.T, roomID string, check func(gjson.Result) bool) string {
// Fails the test if the /sync request does not return 200 OK.
// Returns the top-level parsed /sync response JSON as well as the next_batch token from the response.
func (c *CSAPI) MustSync(t *testing.T, syncReq SyncReq) (gjson.Result, string) {
t.Helper()
return c.SyncUntil(t, "", "", "rooms.join."+GjsonEscape(roomID)+".timeline.events", check)
query := url.Values{
"timeout": []string{"1000"},
}
// configure the HTTP request based on SyncReq
if syncReq.TimeoutMillis != "" {
query["timeout"] = []string{syncReq.TimeoutMillis}
}
if syncReq.Since != "" {
query["since"] = []string{syncReq.Since}
}
if syncReq.Filter != "" {
query["filter"] = []string{syncReq.Filter}
}
if syncReq.FullState {
query["full_state"] = []string{"true"}
}
if syncReq.SetPresence != "" {
query["set_presence"] = []string{syncReq.SetPresence}
}
res := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "sync"}, WithQueries(query))
body := ParseJSON(t, res)
result := gjson.ParseBytes(body)
nextBatch := GetJSONFieldStr(t, body, "next_batch")
return result, nextBatch
}

// SyncUntilGlobalAccountDataHas is a wrapper around `SyncUntil`.
// It blocks and continually calls `/sync` until
// - we an event in the global account data for which the `check` function returns True
// If the `check` function fails the test, the failing event will be automatically logged.
// Will time out after CSAPI.SyncUntilTimeout.
// MustSyncUntil blocks and continually calls /sync (advancing the since token) until all the
// check functions return no error. Returns the final/latest since token.
//
// Returns the `next_batch` token from the last /sync response. This can be passed as
// `since` to sync from this point forward only.
func (c *CSAPI) SyncUntilGlobalAccountDataHas(t *testing.T, check func(gjson.Result) bool) string {
t.Helper()
return c.SyncUntil(t, "", "", "account_data.events", check)
}

// SyncUntilInvitedTo is a wrapper around SyncUntil.
// It blocks and continually calls `/sync` until we've been invited to the given room.
// Will time out after CSAPI.SyncUntilTimeout.
// Initial /sync example: (no since token)
// bob.InviteRoom(t, roomID, alice.UserID)
// alice.JoinRoom(t, roomID, nil)
// alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
//
// Returns the `next_batch` token from the last /sync response. This can be passed as
// `since` to sync from this point forward only.
func (c *CSAPI) SyncUntilInvitedTo(t *testing.T, roomID string) string {
t.Helper()
check := func(event gjson.Result) bool {
return event.Get("type").Str == "m.room.member" &&
event.Get("content.membership").Str == "invite" &&
event.Get("state_key").Str == c.UserID
}
return c.SyncUntil(t, "", "", "rooms.invite."+GjsonEscape(roomID)+".invite_state.events", check)
}

// SyncUntilJoined is a wrapper around SyncUntil.
// It blocks and continually calls `/sync` until we've joined the given room.
// Will time out after CSAPI.SyncUntilTimeout.
func (c *CSAPI) SyncUntilJoined(t *testing.T, roomID string) {
t.Helper()
c.SyncUntilTimelineHas(t, roomID, func(event gjson.Result) bool {
return event.Get("type").Str == "m.room.member" &&
event.Get("content.membership").Str == "join" &&
event.Get("state_key").Str == c.UserID
})
}

// SyncUntil blocks and continually calls /sync until
// - the response contains a particular `key`, and
// - its corresponding value is an array
// - some element in that array makes the `check` function return true.
// If the `check` function fails the test, the failing event will be automatically logged.
// Will time out after CSAPI.SyncUntilTimeout.
// Incremental /sync example: (test controls since token)
// since := alice.MustSyncUntil(t, client.SyncReq{TimeoutMillis: "0"}) // get a since token
// bob.InviteRoom(t, roomID, alice.UserID)
// since = alice.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncInvitedTo(alice.UserID, roomID))
// alice.JoinRoom(t, roomID, nil)
// alice.MustSyncUntil(t, client.SyncReq{Since: since}, client.SyncJoinedTo(alice.UserID, roomID))
//
// Returns the `next_batch` token from the last /sync response. This can be passed as
// `since` to sync from this point forward only.
func (c *CSAPI) SyncUntil(t *testing.T, since, filter, key string, check func(gjson.Result) bool) string {
// Checking multiple parts of /sync:
// alice.MustSyncUntil(
// t, client.SyncReq{},
// client.SyncJoinedTo(alice.UserID, roomID),
// client.SyncJoinedTo(alice.UserID, roomID2),
// client.SyncJoinedTo(alice.UserID, roomID3),
// )
//
// Check functions are unordered and independent. Once a check function returns true it is removed
// from the list of checks and won't be called again.
//
// In the unlikely event that you want all the checkers to pass *explicitly* in a single /sync
// response (e.g to assert some form of atomic update which updates multiple parts of the /sync
// response at once) then make your own checker function which does this.
//
// In the unlikely event that you need ordering on your checks, call MustSyncUntil multiple times
// with a single checker, and reuse the returned since token, as in the "Incremental sync" example.
//
// Will time out after CSAPI.SyncUntilTimeout. Returns the latest since token used.
func (c *CSAPI) MustSyncUntil(t *testing.T, syncReq SyncReq, checks ...SyncCheckOpt) string {
t.Helper()
start := time.Now()
checkCounter := 0
// Print failing events in a defer() so we handle t.Fatalf in the same way as t.Errorf
var wasFailed = t.Failed()
var lastEvent *gjson.Result
timedOut := false
defer func() {
if !wasFailed && t.Failed() {
raw := ""
if lastEvent != nil {
raw = lastEvent.Raw
}
if !timedOut {
t.Logf("SyncUntil: failing event %s", raw)
}
numResponsesReturned := 0
checkers := make([]struct {
check SyncCheckOpt
errs []string
}, len(checks))
for i := range checks {
c := checkers[i]
c.check = checks[i]
checkers[i] = c
}
printErrors := func() string {
err := "Checkers:\n"
for _, c := range checkers {
err += strings.Join(c.errs, "\n")
err += ", \n"
}
}()
return err
}
for {
if time.Since(start) > c.SyncUntilTimeout {
timedOut = true
t.Fatalf("SyncUntil: timed out. Called check function %d times", checkCounter)
t.Fatalf("%s MustSyncUntil: timed out after %v. Seen %d /sync responses. %s", c.UserID, time.Since(start), numResponsesReturned, printErrors())
}
query := url.Values{
"timeout": []string{"1000"},
}
if since != "" {
query["since"] = []string{since}
}
if filter != "" {
query["filter"] = []string{filter}
}
res := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "sync"}, WithQueries(query))
body := ParseJSON(t, res)
since = GetJSONFieldStr(t, body, "next_batch")
keyRes := gjson.GetBytes(body, key)
if keyRes.IsArray() {
events := keyRes.Array()
for i, ev := range events {
lastEvent = &events[i]
if check(ev) {
return since
}
wasFailed = t.Failed()
checkCounter++
response, nextBatch := c.MustSync(t, syncReq)
syncReq.Since = nextBatch
numResponsesReturned += 1

for i := 0; i < len(checkers); i++ {
err := checkers[i].check(c.UserID, response)
if err == nil {
// check passed, removed from checkers
checkers = append(checkers[:i], checkers[i+1:]...)
Comment thread
kegsay marked this conversation as resolved.
i--
} else {
c := checkers[i]
c.errs = append(c.errs, fmt.Sprintf("[t=%v] Response #%d: %s", time.Since(start), numResponsesReturned, err))
checkers[i] = c
}
}
if len(checkers) == 0 {
// every checker has passed!
return syncReq.Since
}
}
}

Expand Down Expand Up @@ -509,3 +548,85 @@ func GjsonEscape(in string) string {
in = strings.ReplaceAll(in, "*", `\*`)
return in
}

// Check that the timeline for `roomID` has an event which passes the check function.
func SyncTimelineHas(roomID string, check func(gjson.Result) bool) SyncCheckOpt {
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
err := loopArray(
topLevelSyncJSON, "rooms.join."+GjsonEscape(roomID)+".timeline.events", check,
)
if err == nil {
return nil
}
return fmt.Errorf("SyncTimelineHas(%s): %s", roomID, err)
}
}

// Checks that `userID` gets invited to `roomID`.
//
// This checks different parts of the /sync response depending on the client making the request.
// If the client is also the person being invited to the room then the 'invite' block will be inspected.
// If the client is different to the person being invited then the 'join' block will be inspected.
func SyncInvitedTo(userID, roomID string) SyncCheckOpt {
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
// two forms which depend on what the client user is:
// - passively viewing an invite for a room you're joined to (timeline events)
// - actively being invited to a room.
if clientUserID == userID {
// active
err := loopArray(
topLevelSyncJSON, "rooms.invite."+GjsonEscape(roomID)+".invite_state.events",
func(ev gjson.Result) bool {
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "invite"
},
)
if err != nil {
return fmt.Errorf("SyncInvitedTo(%s): %s", roomID, err)
}
return nil
}
// passive
return SyncTimelineHas(roomID, func(ev gjson.Result) bool {
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "invite"
})(clientUserID, topLevelSyncJSON)
}
}

// Check that `userID` gets joined to `roomID` by inspecting the join timeline for a membership event.
func SyncJoinedTo(userID, roomID string) SyncCheckOpt {
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
// awkward wrapping to get the error message correct at the start :/
err := SyncTimelineHas(roomID, func(ev gjson.Result) bool {
return ev.Get("type").Str == "m.room.member" && ev.Get("state_key").Str == userID && ev.Get("content.membership").Str == "join"
})(clientUserID, topLevelSyncJSON)
if err == nil {
return nil
}
return fmt.Errorf("SyncJoinedTo(%s,%s): %s", userID, roomID, err)
}
}

// Calls the `check` function for each global account data event, and returns with success if the
// `check` function returns true for at least one event.
func SyncGlobalAccountDataHas(check func(gjson.Result) bool) SyncCheckOpt {
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
return loopArray(topLevelSyncJSON, "account_data.events", check)
}
}

func loopArray(object gjson.Result, key string, check func(gjson.Result) bool) error {
array := object.Get(key)
if !array.Exists() {
return fmt.Errorf("Key %s does not exist", key)
}
if !array.IsArray() {
return fmt.Errorf("Key %s exists but it isn't an array", key)
}
goArray := array.Array()
for _, ev := range goArray {
if check(ev) {
return nil
}
}
return fmt.Errorf("check function did not pass while iterating over %d elements: %v", len(goArray), array.Raw)
}
7 changes: 1 addition & 6 deletions tests/csapi/apidoc_room_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,5 @@ func TestRoomCreateWithInvites(t *testing.T) {
"invite": []string{bob.UserID},
})

bob.SyncUntil(t, "", "", "rooms.invite."+client.GjsonEscape(roomID)+".invite_state.events", func(event gjson.Result) bool {
return event.Get("type").Str == "m.room.member" &&
event.Get("content.membership").Str == "invite" &&
event.Get("state_key").Str == bob.UserID &&
event.Get("sender").Str == alice.UserID
})
bob.MustSyncUntil(t, client.SyncReq{}, client.SyncInvitedTo(bob.UserID, roomID))
}
Loading