From fb7307401a183649300de37fbf569ef1216973a5 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 24 May 2022 18:55:19 +0100 Subject: [PATCH 1/6] Split out function to wait until a homeserver deployment is ready to serve requests Signed-off-by: Sean Quah --- internal/docker/deployer.go | 121 +++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 57 deletions(-) diff --git a/internal/docker/deployer.go b/internal/docker/deployer.go index deee3d59..defe5935 100644 --- a/internal/docker/deployer.go +++ b/internal/docker/deployer.go @@ -308,61 +308,6 @@ func deployImage( ) } - var lastErr error - - // Inspect health status of container to check it is up - stopTime := time.Now().Add(cfg.SpawnHSTimeout) - iterCount := 0 - if inspect.State.Health != nil { - // If the container has a healthcheck, wait for it first - for { - iterCount += 1 - if time.Now().After(stopTime) { - lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr) - break - } - inspect, err = docker.ContainerInspect(ctx, containerID) - if err != nil { - lastErr = fmt.Errorf("inspect container %s => error: %s", containerID, err) - time.Sleep(50 * time.Millisecond) - continue - } - if inspect.State.Health.Status != "healthy" { - lastErr = fmt.Errorf("inspect container %s => health: %s", containerID, inspect.State.Health.Status) - time.Sleep(50 * time.Millisecond) - continue - } - lastErr = nil - break - - } - } - - // Having optionally waited for container to self-report healthy - // hit /versions to check it is actually responding - versionsURL := fmt.Sprintf("%s/_matrix/client/versions", baseURL) - - for { - iterCount += 1 - if time.Now().After(stopTime) { - lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr) - break - } - res, err := http.Get(versionsURL) - if err != nil { - lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err) - time.Sleep(50 * time.Millisecond) - continue - } - if res.StatusCode != 200 { - lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status) - time.Sleep(50 * time.Millisecond) - continue - } - lastErr = nil - break - } - d := &HomeserverDeployment{ BaseURL: baseURL, FedBaseURL: fedBaseURL, @@ -371,8 +316,11 @@ func deployImage( ApplicationServices: asIDToRegistrationFromLabels(inspect.Config.Labels), DeviceIDs: deviceIDsFromLabels(inspect.Config.Labels), } - if lastErr != nil { - return d, fmt.Errorf("%s: failed to check server is up. %w", contextStr, lastErr) + + stopTime := time.Now().Add(cfg.SpawnHSTimeout) + iterCount, err := waitForContainer(ctx, docker, d, stopTime) + if err != nil { + return d, fmt.Errorf("%s: failed to check server is up. %w", contextStr, err) } else { if cfg.DebugLoggingEnabled { log.Printf("%s: Server is responding after %d iterations", contextStr, iterCount) @@ -407,6 +355,65 @@ func copyToContainer(docker *client.Client, containerID, path string, data []byt return nil } +// Waits until a homeserver deployment is ready to serve requests. +func waitForContainer(ctx context.Context, docker *client.Client, hsDep *HomeserverDeployment, stopTime time.Time) (iterCount int, err error) { + var lastErr error = nil + + iterCount = 0 + + // If the container has a healthcheck, wait for it first + for { + iterCount += 1 + if time.Now().After(stopTime) { + lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr) + break + } + inspect, err := docker.ContainerInspect(ctx, hsDep.ContainerID) + if err != nil { + lastErr = fmt.Errorf("inspect container %s => error: %s", hsDep.ContainerID, err) + time.Sleep(50 * time.Millisecond) + continue + } + if inspect.State.Health != nil && + inspect.State.Health.Status != "healthy" { + lastErr = fmt.Errorf("inspect container %s => health: %s", hsDep.ContainerID, inspect.State.Health.Status) + time.Sleep(50 * time.Millisecond) + continue + } + + // The container is healthy or has no health check. + lastErr = nil + break + } + + // Having optionally waited for container to self-report healthy + // hit /versions to check it is actually responding + versionsURL := fmt.Sprintf("%s/_matrix/client/versions", hsDep.BaseURL) + + for { + iterCount += 1 + if time.Now().After(stopTime) { + lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr) + break + } + res, err := http.Get(versionsURL) + if err != nil { + lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err) + time.Sleep(50 * time.Millisecond) + continue + } + if res.StatusCode != 200 { + lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status) + time.Sleep(50 * time.Millisecond) + continue + } + lastErr = nil + break + } + + return iterCount, lastErr +} + // RoundTripper is a round tripper that maps https://hs1 to the federation port of the container // e.g https://localhost:35352 type RoundTripper struct { From 6eee854cc1a7a4564f1ce8ec61b370798dd22209 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 24 May 2022 18:56:05 +0100 Subject: [PATCH 2/6] Explicitly specify host ports for homeserver deployments, otherwise they will change on restart Signed-off-by: Sean Quah --- internal/docker/deployer.go | 46 +++++++++++++++++++++++++++++++++++-- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/internal/docker/deployer.go b/internal/docker/deployer.go index defe5935..8563adf8 100644 --- a/internal/docker/deployer.go +++ b/internal/docker/deployer.go @@ -20,9 +20,11 @@ import ( "crypto/tls" "fmt" "log" + "net" "net/http" "net/url" "runtime" + "strconv" "sync" "time" @@ -197,6 +199,11 @@ func deployImage( "SERVER_NAME=" + hsName, } + port1, port2, err := allocateHostPorts() + if err != nil { + return nil, err + } + body, err := docker.ContainerCreate(ctx, &container.Config{ Image: imageID, Env: env, @@ -212,12 +219,14 @@ func deployImage( PortBindings: nat.PortMap{ nat.Port("8008/tcp"): []nat.PortBinding{ { - HostIP: "127.0.0.1", + HostIP: "127.0.0.1", + HostPort: strconv.Itoa(port1), }, }, nat.Port("8448/tcp"): []nat.PortBinding{ { - HostIP: "127.0.0.1", + HostIP: "127.0.0.1", + HostPort: strconv.Itoa(port2), }, }, }, @@ -329,6 +338,39 @@ func deployImage( return d, nil } +// Picks two free ports on localhost. Does not reserve them in any way. +// The returned ports must be used before the next call to `allocateHostPorts`, +// otherwise the same pair of ports may be returned. +func allocateHostPorts() (int, int, error) { + localhost_any_port := net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 0, + } + + listener1, err := net.ListenTCP("tcp", &localhost_any_port) + if err != nil { + return 0, 0, err + } + listener2, err := net.ListenTCP("tcp", &localhost_any_port) + if err != nil { + return 0, 0, err + } + + port1 := listener1.Addr().(*net.TCPAddr).Port + port2 := listener2.Addr().(*net.TCPAddr).Port + + err = listener1.Close() + if err != nil { + return 0, 0, err + } + err = listener2.Close() + if err != nil { + return 0, 0, err + } + + return port1, port2, nil +} + func copyToContainer(docker *client.Client, containerID, path string, data []byte) error { // Create a fake/virtual file in memory that we can copy to the container // via https://stackoverflow.com/a/52131297/796832 From 3147a3fbdcc06006c7bfb722381b0a04e1bbafc3 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 24 May 2022 18:56:42 +0100 Subject: [PATCH 3/6] Add a function to restart a deployment Signed-off-by: Sean Quah --- internal/docker/deployer.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/internal/docker/deployer.go b/internal/docker/deployer.go index 8563adf8..71bb1a3f 100644 --- a/internal/docker/deployer.go +++ b/internal/docker/deployer.go @@ -456,6 +456,40 @@ func waitForContainer(ctx context.Context, docker *client.Client, hsDep *Homeser return iterCount, lastErr } +// Restart a deployment. +func (dep *Deployment) Restart() error { + ctx := context.Background() + + for _, hsDep := range dep.HS { + err := dep.Deployer.Docker.ContainerStop(ctx, hsDep.ContainerID, &dep.Config.SpawnHSTimeout) + if err != nil { + return fmt.Errorf("failed to restart container %s: %s", hsDep.ContainerID, err) + } + + // Remove the container from the network. If we don't do this, + // (re)starting the container fails with an error like + // "Error response from daemon: endpoint with name complement_fed_1_fed.alice.hs1_1 already exists in network complement_fed_alice". + err = dep.Deployer.Docker.NetworkDisconnect(ctx, dep.Deployer.networkID, hsDep.ContainerID, false) + if err != nil { + return fmt.Errorf("failed to restart container %s: %s", hsDep.ContainerID, err) + } + + err = dep.Deployer.Docker.ContainerStart(ctx, hsDep.ContainerID, types.ContainerStartOptions{}) + if err != nil { + return fmt.Errorf("failed to restart container %s: %s", hsDep.ContainerID, err) + } + + // Wait for the container to be ready. + stopTime := time.Now().Add(dep.Config.SpawnHSTimeout) + _, err = waitForContainer(ctx, dep.Deployer.Docker, &hsDep, stopTime) + if err != nil { + return fmt.Errorf("failed to restart container %s: %s", hsDep.ContainerID, err) + } + } + + return nil +} + // RoundTripper is a round tripper that maps https://hs1 to the federation port of the container // e.g https://localhost:35352 type RoundTripper struct { From 9d9bf8e832f46aaedfd77a7add12e8adeaa73a9f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 24 May 2022 18:59:39 +0100 Subject: [PATCH 4/6] Add test for restarting a homeserver during a partial state join Signed-off-by: Sean Quah --- ...federation_room_join_partial_state_test.go | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 45ec6a74..b4eee74f 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -182,6 +182,62 @@ func TestPartialStateJoin(t *testing.T) { }) } }) + + // test that a partial-state join continues syncing state after a restart + // the same as SyncBlocksDuringPartialStateJoin, with a restart in the middle + t.Run("PartialStateJoinContinuesAfterRestart", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + // Alice has now joined the room, and the server is syncing the state in the background. + + // wait for the state_ids request to arrive + psjResult.AwaitStateIdsRequest(t) + + // restart the homeserver + err := deployment.Restart() + if err != nil { + t.Errorf("Failed to restart homeserver: %s", err) + } + + // attempts to sync should block. Fire off a goroutine to try it. + syncResponseChan := make(chan gjson.Result) + defer close(syncResponseChan) + go func() { + response, _ := alice.MustSync(t, client.SyncReq{}) + syncResponseChan <- response + }() + + // wait for the state_ids request to arrive + psjResult.AwaitStateIdsRequest(t) + + // the client-side requests should still be waiting + select { + case <-syncResponseChan: + t.Fatalf("Sync completed before state resync complete") + default: + } + + // release the federation /state response + psjResult.FinishStateRequest() + + // the /sync request should now complete, with the new room + var syncRes gjson.Result + select { + case <-time.After(1 * time.Second): + t.Fatalf("/sync request request did not complete") + case syncRes = <-syncResponseChan: + } + + roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID)) + if !roomRes.Exists() { + t.Fatalf("/sync completed without join to new room\n") + } + }) } // buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq From c0caa5ecf0bf8e4e5b246b51141e7057df555efa Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 27 May 2022 16:22:01 +0100 Subject: [PATCH 5/6] Don't use AwaitStateIdsRequest a second time --- tests/federation_room_join_partial_state_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index b4eee74f..91ef9ead 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -212,8 +212,8 @@ func TestPartialStateJoin(t *testing.T) { syncResponseChan <- response }() - // wait for the state_ids request to arrive - psjResult.AwaitStateIdsRequest(t) + // we expect another state_ids request to arrive. + // we'd do another AwaitStateIdsRequest, except it's single-use. // the client-side requests should still be waiting select { From ec228c1197b02f5332c469f866efd7642c375699 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 13 Jun 2022 16:49:16 +0100 Subject: [PATCH 6/6] Fix style: use camelcase --- internal/docker/deployer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/docker/deployer.go b/internal/docker/deployer.go index 7fa89037..40ce645c 100644 --- a/internal/docker/deployer.go +++ b/internal/docker/deployer.go @@ -362,16 +362,16 @@ func deployImage( // The returned ports must be used before the next call to `allocateHostPorts`, // otherwise the same pair of ports may be returned. func allocateHostPorts() (int, int, error) { - localhost_any_port := net.TCPAddr{ + localhostAnyPort := net.TCPAddr{ IP: net.ParseIP("127.0.0.1"), Port: 0, } - listener1, err := net.ListenTCP("tcp", &localhost_any_port) + listener1, err := net.ListenTCP("tcp", &localhostAnyPort) if err != nil { return 0, 0, err } - listener2, err := net.ListenTCP("tcp", &localhost_any_port) + listener2, err := net.ListenTCP("tcp", &localhostAnyPort) if err != nil { return 0, 0, err }