Skip to content
Merged
4 changes: 2 additions & 2 deletions cmd/homerunner/route_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ type ReqCreate struct {
}

type ResCreate struct {
Homeservers map[string]docker.HomeserverDeployment `json:"homeservers"`
Expires time.Time `json:"expires"`
Homeservers map[string]*docker.HomeserverDeployment `json:"homeservers"`
Expires time.Time `json:"expires"`
}

// RouteCreate handles creating blueprint deployments. There are 3 supported types of requests:
Expand Down
207 changes: 130 additions & 77 deletions internal/docker/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (d *Deployer) Deploy(ctx context.Context, blueprintName string) (*Deploymen
dep := &Deployment{
Deployer: d,
BlueprintName: blueprintName,
HS: make(map[string]HomeserverDeployment),
HS: make(map[string]*HomeserverDeployment),
Config: d.config,
}
images, err := d.Docker.ImageList(ctx, types.ImageListOptions{
Expand Down Expand Up @@ -127,7 +127,7 @@ func (d *Deployer) Deploy(ctx context.Context, blueprintName string) (*Deploymen
}
mu.Lock()
d.log("%s -> %s (%s)\n", contextStr, deployment.BaseURL, deployment.ContainerID)
dep.HS[hsName] = *deployment
dep.HS[hsName] = deployment
mu.Unlock()
return nil
}
Expand Down Expand Up @@ -176,6 +176,43 @@ func (d *Deployer) Destroy(dep *Deployment, printServerLogs bool) {
}
}

// Restart a homeserver deployment.
func (d *Deployer) Restart(hsDep *HomeserverDeployment, cfg *config.Complement) error {
ctx := context.Background()
err := d.Docker.ContainerStop(ctx, hsDep.ContainerID, &cfg.SpawnHSTimeout)
if err != nil {
return fmt.Errorf("Restart: Failed to stop container %s: %s", hsDep.ContainerID, err)
}

// Remove the container from the network. If we don't do this,
// (re)starting the container fails with an error like
// "Error response from daemon: endpoint with name complement_fed_1_fed.alice.hs1_1 already exists in network complement_fed_alice".
err = d.Docker.NetworkDisconnect(ctx, d.networkID, hsDep.ContainerID, false)
if err != nil {
return fmt.Errorf("Restart: Failed to disconnect container %s: %s", hsDep.ContainerID, err)
}

err = d.Docker.ContainerStart(ctx, hsDep.ContainerID, types.ContainerStartOptions{})
if err != nil {
return fmt.Errorf("Restart: Failed to start container %s: %s", hsDep.ContainerID, err)
}

// Wait for the container to be ready.
baseURL, fedBaseURL, err := waitForPorts(ctx, d.Docker, hsDep.ContainerID)
if err != nil {
return fmt.Errorf("Restart: Failed to get ports for container %s: %s", hsDep.ContainerID, err)
}
hsDep.SetEndpoints(baseURL, fedBaseURL)

stopTime := time.Now().Add(cfg.SpawnHSTimeout)
_, err = waitForContainer(ctx, d.Docker, hsDep, stopTime)
if err != nil {
return fmt.Errorf("Restart: Failed to restart container %s: %s", hsDep.ContainerID, err)
}

return nil
}

// nolint
func deployImage(
docker *client.Client, imageID string, containerName, pkgNamespace, blueprintName, hsName string,
Expand Down Expand Up @@ -301,89 +338,21 @@ func deployImage(
log.Printf("%s: Started container %s", contextStr, containerID)
}

// We need to hammer the inspect endpoint until the ports show up, they don't appear immediately.
var inspect types.ContainerJSON
var baseURL, fedBaseURL string
inspectStartTime := time.Now()
for time.Since(inspectStartTime) < time.Second {
inspect, err = docker.ContainerInspect(ctx, containerID)
if err != nil {
return stubDeployment, err
}
if inspect.State != nil && !inspect.State.Running {
// the container exited, bail out with a container ID for logs
return stubDeployment, fmt.Errorf("container is not running, state=%v", inspect.State.Status)
}
baseURL, fedBaseURL, err = endpoints(inspect.NetworkSettings.Ports, 8008, 8448)
if err == nil {
break
}
}
baseURL, fedBaseURL, err := waitForPorts(ctx, docker, containerID)
if err != nil {
return stubDeployment, fmt.Errorf("%s : image %s : %w", contextStr, imageID, err)
}
inspect, err := docker.ContainerInspect(ctx, containerID)
if err != nil {
Comment thread
kegsay marked this conversation as resolved.
return stubDeployment, err
}
for vol := range inspect.Config.Volumes {
log.Printf(
"WARNING: %s has a named VOLUME %s - volumes can lead to unpredictable behaviour due to "+
"test pollution. Remove the VOLUME in the Dockerfile to suppress this message.", containerName, vol,
)
}

var lastErr error

// Inspect health status of container to check it is up
stopTime := time.Now().Add(cfg.SpawnHSTimeout)
iterCount := 0
if inspect.State.Health != nil {
// If the container has a healthcheck, wait for it first
for {
iterCount += 1
if time.Now().After(stopTime) {
lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr)
break
}
inspect, err = docker.ContainerInspect(ctx, containerID)
if err != nil {
lastErr = fmt.Errorf("inspect container %s => error: %s", containerID, err)
time.Sleep(50 * time.Millisecond)
continue
}
if inspect.State.Health.Status != "healthy" {
lastErr = fmt.Errorf("inspect container %s => health: %s", containerID, inspect.State.Health.Status)
time.Sleep(50 * time.Millisecond)
continue
}
lastErr = nil
break

}
}

// Having optionally waited for container to self-report healthy
// hit /versions to check it is actually responding
versionsURL := fmt.Sprintf("%s/_matrix/client/versions", baseURL)

for {
iterCount += 1
if time.Now().After(stopTime) {
lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr)
break
}
res, err := http.Get(versionsURL)
if err != nil {
lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err)
time.Sleep(50 * time.Millisecond)
continue
}
if res.StatusCode != 200 {
lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status)
time.Sleep(50 * time.Millisecond)
continue
}
lastErr = nil
break
}

d := &HomeserverDeployment{
BaseURL: baseURL,
FedBaseURL: fedBaseURL,
Expand All @@ -392,8 +361,11 @@ func deployImage(
ApplicationServices: asIDToRegistrationFromLabels(inspect.Config.Labels),
DeviceIDs: deviceIDsFromLabels(inspect.Config.Labels),
}
if lastErr != nil {
return d, fmt.Errorf("%s: failed to check server is up. %w", contextStr, lastErr)

stopTime := time.Now().Add(cfg.SpawnHSTimeout)
iterCount, err := waitForContainer(ctx, docker, d, stopTime)
if err != nil {
return d, fmt.Errorf("%s: failed to check server is up. %w", contextStr, err)
} else {
if cfg.DebugLoggingEnabled {
log.Printf("%s: Server is responding after %d iterations", contextStr, iterCount)
Expand Down Expand Up @@ -428,6 +400,87 @@ func copyToContainer(docker *client.Client, containerID, path string, data []byt
return nil
}

// Waits until a homeserver container has NAT ports assigned and returns its clientside API URL and federation API URL.
func waitForPorts(ctx context.Context, docker *client.Client, containerID string) (baseURL string, fedBaseURL string, err error) {
// We need to hammer the inspect endpoint until the ports show up, they don't appear immediately.
var inspect types.ContainerJSON
inspectStartTime := time.Now()
for time.Since(inspectStartTime) < time.Second {
inspect, err = docker.ContainerInspect(ctx, containerID)
if err != nil {
return "", "", err
}
if inspect.State != nil && !inspect.State.Running {
// the container exited, bail out with a container ID for logs
return "", "", fmt.Errorf("container is not running, state=%v", inspect.State.Status)
}
baseURL, fedBaseURL, err = endpoints(inspect.NetworkSettings.Ports, 8008, 8448)
if err == nil {
break
}
}
return baseURL, fedBaseURL, nil
}

// Waits until a homeserver deployment is ready to serve requests.
func waitForContainer(ctx context.Context, docker *client.Client, hsDep *HomeserverDeployment, stopTime time.Time) (iterCount int, err error) {
var lastErr error = nil

iterCount = 0

// If the container has a healthcheck, wait for it first
for {
iterCount += 1
if time.Now().After(stopTime) {
lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr)
break
}
inspect, err := docker.ContainerInspect(ctx, hsDep.ContainerID)
if err != nil {
lastErr = fmt.Errorf("inspect container %s => error: %s", hsDep.ContainerID, err)
time.Sleep(50 * time.Millisecond)
continue
}
if inspect.State.Health != nil &&
inspect.State.Health.Status != "healthy" {
lastErr = fmt.Errorf("inspect container %s => health: %s", hsDep.ContainerID, inspect.State.Health.Status)
time.Sleep(50 * time.Millisecond)
continue
}

// The container is healthy or has no health check.
lastErr = nil
break
}

// Having optionally waited for container to self-report healthy
// hit /versions to check it is actually responding
versionsURL := fmt.Sprintf("%s/_matrix/client/versions", hsDep.BaseURL)

for {
iterCount += 1
if time.Now().After(stopTime) {
lastErr = fmt.Errorf("timed out checking for homeserver to be up: %s", lastErr)
break
}
res, err := http.Get(versionsURL)
if err != nil {
lastErr = fmt.Errorf("GET %s => error: %s", versionsURL, err)
time.Sleep(50 * time.Millisecond)
continue
}
if res.StatusCode != 200 {
lastErr = fmt.Errorf("GET %s => HTTP %s", versionsURL, res.Status)
time.Sleep(50 * time.Millisecond)
continue
}
lastErr = nil
break
}

return iterCount, lastErr
}

// RoundTripper is a round tripper that maps https://hs1 to the federation port of the container
// e.g https://localhost:35352
type RoundTripper struct {
Expand Down
32 changes: 30 additions & 2 deletions internal/docker/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Deployment struct {
// The name of the deployed blueprint
BlueprintName string
// A map of HS name to a HomeserverDeployment
HS map[string]HomeserverDeployment
HS map[string]*HomeserverDeployment
Config *config.Complement
}

Expand All @@ -28,6 +28,17 @@ type HomeserverDeployment struct {
AccessTokens map[string]string // e.g { "@alice:hs1": "myAcc3ssT0ken" }
ApplicationServices map[string]string // e.g { "my-as-id": "id: xxx\nas_token: xxx ..."} }
DeviceIDs map[string]string // e.g { "@alice:hs1": "myDeviceID" }
CSAPIClients []*client.CSAPI
}

// Updates the client and federation base URLs of the homeserver deployment.
func (hsDep *HomeserverDeployment) SetEndpoints(baseURL string, fedBaseURL string) {
hsDep.BaseURL = baseURL
hsDep.FedBaseURL = fedBaseURL

for _, client := range hsDep.CSAPIClients {
client.BaseURL = baseURL
}
}

// Destroy the entire deployment. Destroys all running containers. If `printServerLogs` is true,
Expand Down Expand Up @@ -56,7 +67,7 @@ func (d *Deployment) Client(t *testing.T, hsName, userID string) *client.CSAPI {
if deviceID == "" && userID != "" {
t.Logf("WARNING: Deployment.Client - HS name '%s' - user ID '%s' - deviceID not found", hsName, userID)
}
return &client.CSAPI{
client := &client.CSAPI{
UserID: userID,
AccessToken: token,
DeviceID: deviceID,
Expand All @@ -65,6 +76,8 @@ func (d *Deployment) Client(t *testing.T, hsName, userID string) *client.CSAPI {
SyncUntilTimeout: 5 * time.Second,
Debug: d.Deployer.debugLogging,
}
dep.CSAPIClients = append(dep.CSAPIClients, client)
return client
}

// RegisterUser within a homeserver and return an authenticatedClient, Fails the test if the hsName is not found.
Expand All @@ -81,6 +94,7 @@ func (d *Deployment) RegisterUser(t *testing.T, hsName, localpart, password stri
SyncUntilTimeout: 5 * time.Second,
Debug: d.Deployer.debugLogging,
}
dep.CSAPIClients = append(dep.CSAPIClients, client)
var userID, accessToken, deviceID string
if isAdmin {
userID, accessToken, deviceID = client.RegisterSharedSecret(t, localpart, password, isAdmin)
Expand All @@ -96,3 +110,17 @@ func (d *Deployment) RegisterUser(t *testing.T, hsName, localpart, password stri
client.DeviceID = deviceID
return client
}

// Restart a deployment.
func (dep *Deployment) Restart(t *testing.T) error {
t.Helper()
for _, hsDep := range dep.HS {
err := dep.Deployer.Restart(hsDep, dep.Config)
if err != nil {
t.Errorf("Deployment.Restart: %s", err)
return err
}
}

return nil
}