Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 3 additions & 74 deletions src/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ func (a *App) createJob(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Job type is required", http.StatusBadRequest)
return
}
if err := a.scheduler.Enqueue("jobType", "gpuType", payload); err != nil {
a.log.Error("enqueue failed", "err", err, "payload", payload)
jobID, err := a.scheduler.Enqueue(req.Type, req.RequiredGPU, req.Payload)
if err != nil {
a.log.Error("enqueue failed", "err", err, "payload", req.Payload)
http.Error(w, "enqueue failed", http.StatusInternalServerError)
return
}
Expand Down Expand Up @@ -325,75 +326,3 @@ func (a *App) getAllSupervisors(w http.ResponseWriter, r *http.Request) {
return
}
}

func (a *App) getSupervisorStatus(w http.ResponseWriter, r *http.Request) {
supervisors, err := a.statusRegistry.GetAllSupervisors()
if err != nil {
a.log.Error("failed to get supervisor status", "error", err)
http.Error(w, "failed to get supervisor status", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"supervisors": supervisors,
"count": len(supervisors),
}); err != nil {
a.log.Error("failed to encode supervisor status response", "error", err)
http.Error(w, "failed to encode response", http.StatusInternalServerError)
return
}
}

func (a *App) getSupervisorStatusByID(w http.ResponseWriter, r *http.Request) {
// extract consumer ID from URL path
path := strings.TrimPrefix(r.URL.Path, "/supervisors/status/")
if path == "" {
http.Error(w, "consumer ID required", http.StatusBadRequest)
return
}

supervisor, err := a.statusRegistry.GetSupervisor(path)
if err != nil {
a.log.Error("failed to get supervisor status", "consumer_id", path, "error", err)
http.Error(w, "supervisor not found", http.StatusNotFound)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(supervisor); err != nil {
a.log.Error("failed to encode supervisor status response", "error", err)
http.Error(w, "failed to encode response", http.StatusInternalServerError)
return
}
}

func (a *App) getAllSupervisors(w http.ResponseWriter, r *http.Request) {
activeOnly := r.URL.Query().Get("active") == "true"

var supervisors []SupervisorStatus
var err error

if activeOnly {
supervisors, err = a.statusRegistry.GetActiveSupervisors()
} else {
supervisors, err = a.statusRegistry.GetAllSupervisors()
}

if err != nil {
a.log.Error("failed to get supervisors", "active_only", activeOnly, "error", err)
http.Error(w, "failed to get supervisors", http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"supervisors": supervisors,
"count": len(supervisors),
"active_only": activeOnly,
}); err != nil {
a.log.Error("failed to encode supervisors response", "error", err)
http.Error(w, "failed to encode response", http.StatusInternalServerError)
return
}
}
158 changes: 158 additions & 0 deletions src/container_job_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package main

import (
"context"
"fmt"
"log/slog"
"os"
"testing"
"time"

"mist/docker"
"mist/multilogger"

"github.com/docker/docker/client"
"github.com/redis/go-redis/v9"
)

// TestContainerJobCPU verifies that when a CPU job is enqueued and a CPU supervisor
// picks it up, a container is actually started using the Docker API.
// Requires: Docker running, Redis running (docker-compose up), pytorch-cpu image built.
func TestContainerJobCPU(t *testing.T) {
// Skip if Docker is not available
dockerCli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
t.Skipf("Docker not available, skipping: %v", err)
}
defer dockerCli.Close()

ctx := context.Background()
if _, err := dockerCli.Ping(ctx); err != nil {
t.Skipf("Docker daemon not reachable, skipping: %v", err)
}

// Ensure pytorch-cpu image exists
_, _, err = dockerCli.ImageInspectWithRaw(ctx, "pytorch-cpu")
if err != nil {
t.Skipf("pytorch-cpu image not found. Build it with: cd src/images/pytorch-cpu && docker build -t pytorch-cpu .")
}

redisAddr := "localhost:6379"
redisClient := redis.NewClient(&redis.Options{Addr: redisAddr})
defer redisClient.Close()
if err := redisClient.Ping(ctx).Err(); err != nil {
t.Skipf("Redis not running at %s, skipping: %v (run: docker-compose up -d)", redisAddr, err)
}

// Clean up any stale stream/consumer state for a clean test
redisClient.FlushDB(ctx)

config, _ := multilogger.GetLogConfig()
schedulerLog, err := multilogger.CreateLogger("scheduler", &config)
if err != nil {
t.Fatalf("failed to create logger: %v", err)
}
supervisorLog, err := multilogger.CreateLogger("supervisor", &config)
if err != nil {
t.Fatalf("failed to create logger: %v", err)
}

scheduler := NewScheduler(redisAddr, schedulerLog)
defer scheduler.Close()

consumerID := fmt.Sprintf("worker_cpu_test_%d", os.Getpid())
supervisor := NewSupervisor(redisAddr, consumerID, "CPU", supervisorLog)
if err := supervisor.Start(); err != nil {
t.Fatalf("supervisor start failed: %v", err)
}
defer supervisor.Stop()

// Enqueue a CPU job
payload := map[string]interface{}{
"task": "test_task",
"data": "test_data",
}
jobID, err := scheduler.Enqueue("test_job", "CPU", payload)
if err != nil {
t.Fatalf("enqueue failed: %v", err)
}
slog.Info("enqueued CPU job", "job_id", jobID)

// Wait for the job to be processed (supervisor runs container for ~2 sec then cleans up)
deadline := time.After(15 * time.Second)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-deadline:
t.Fatal("timeout waiting for job to complete")
case <-ticker.C:
metadata, err := redisClient.HGetAll(ctx, "job:"+jobID).Result()
if err != nil {
continue
}
jobState := metadata["job_state"]
if jobState == string(JobStateSuccess) {
t.Logf("job completed successfully: %s", jobID)
return
}
if jobState == string(JobStateFailure) {
t.Fatalf("job failed: %s", jobID)
}
}
}
}

// TestRunContainerCPU is a unit test in the images package that verifies
// running a CPU container works. We run it here via the images package.
func TestRunContainerCPUIntegration(t *testing.T) {
dockerCli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
t.Skipf("Docker not available: %v", err)
}
defer dockerCli.Close()

ctx := context.Background()
if _, err := dockerCli.Ping(ctx); err != nil {
t.Skipf("Docker daemon not reachable: %v", err)
}

// Check if pytorch-cpu image exists
_, _, err = dockerCli.ImageInspectWithRaw(ctx, "pytorch-cpu")
if err != nil {
t.Skipf("pytorch-cpu image not found. Build with: cd src/images/pytorch-cpu && docker build -t pytorch-cpu .")
}

mgr := docker.NewDockerMgr(dockerCli, 10, 100)

volName := "test_cpu_job_vol"
vol, err := mgr.CreateVolume(volName)
if err != nil {
t.Fatalf("create volume: %v", err)
}
if vol.Name != volName {
t.Errorf("volume name: got %s want %s", vol.Name, volName)
}
defer mgr.RemoveVolume(volName, true)

containerID, err := mgr.RunContainer("pytorch-cpu", "runc", volName)
if err != nil {
t.Fatalf("run container: %v", err)
}
defer func() {
_ = mgr.StopContainer(containerID)
_ = mgr.RemoveContainer(containerID)
}()

// Verify container is running
inspect, err := dockerCli.ContainerInspect(ctx, containerID)
if err != nil {
t.Fatalf("Failed to inspect container: %v", err)
}
if inspect.State.Status != "running" {
t.Errorf("container not running: status=%s", inspect.State.Status)
}

t.Logf("CPU container started successfully: %s", containerID[:12])
}
27 changes: 14 additions & 13 deletions src/images/README.md → src/docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ docker run -it --rm --gpus all <image name> bash
```
## Test
```
sh run_tests.sh
run_tests.sh
run_container_job_test.sh
```
## Troubleshooting

Expand Down Expand Up @@ -40,13 +41,13 @@ sh run_tests.sh

## Overview

The `mist/images` package provides a `ContainerMgr` struct and related methods for managing Docker containers and volumes programmatically using the Docker Go SDK. It enforces limits on the number of containers and volumes, and provides safe creation, deletion, and lifecycle management.
The `mist/images` package provides a `DockerMgr` struct and related methods for managing Docker containers and volumes programmatically using the Docker Go SDK. It enforces limits on the number of containers and volumes, and provides safe creation, deletion, and lifecycle management.

---

## Main APIs

### `type ContainerMgr struct`
### `type DockerMgr struct`
Manages Docker containers and volumes, enforces resource limits, and tracks active resources.

**Fields:**
Expand All @@ -60,45 +61,45 @@ Manages Docker containers and volumes, enforces resource limits, and tracks acti

---

### `func NewContainerMgr(client *client.Client, containerLimit, volumeLimit int) *ContainerMgr`
Creates a new `ContainerMgr` with the specified Docker client and resource limits.
### `func NewDockerMgr(client *client.Client, containerLimit, volumeLimit int) *DockerMgr`
Creates a new `DockerMgr` with the specified Docker client and resource limits.

---

### `func (mgr *ContainerMgr) createVolume(volumeName string) (volume.Volume, error)`
### `func (mgr *DockerMgr) createVolume(volumeName string) (volume.Volume, error)`
Creates a Docker volume with the given name, enforcing the volume limit.
Returns the created volume or an error.

---

### `func (mgr *ContainerMgr) removeVolume(volumeName string, force bool) error`
### `func (mgr *DockerMgr) removeVolume(volumeName string, force bool) error`
Removes a Docker volume by name.
Returns an error if the volume does not exist or is in use (unless `force` is true).

---

### `func (mgr *ContainerMgr) runContainer(imageName string, runtimeName string, volumeName string) (string, error)`
### `func (mgr *DockerMgr) runContainer(imageName string, runtimeName string, volumeName string) (string, error)`
Creates and starts a container with the specified image, runtime, and volume attached at `/data`.
Enforces the container limit.
Returns the container ID or an error.

---

### `func (mgr *ContainerMgr) stopContainer(containerID string) error`
### `func (mgr *DockerMgr) stopContainer(containerID string) error`
Stops a running container by ID.
Returns an error if the operation fails.

---

### `func (mgr *ContainerMgr) removeContainer(containerID string) error`
### `func (mgr *DockerMgr) removeContainer(containerID string) error`
Removes a container by ID and deletes it from the internal tracking map.
Returns an error if the operation fails.

---

## Test Plan

The test suite (`serve_image_test.go`) covers the following scenarios:
The test suite (`docker_test.go`) covers the following scenarios:

- Create a volume, check it exists, delete it, check it no longer exists.
- Create a volume with the same name twice (should not fail).
Expand All @@ -116,7 +117,7 @@ The test suite (`serve_image_test.go`) covers the following scenarios:

```go
cli, _ := client.NewClientWithOpts(client.FromEnv)
mgr := NewContainerMgr(cli, 10, 100)
mgr := NewDockerMgr(cli, 10, 100)

vol, err := mgr.createVolume("myvol")
if err != nil { /* handle error */ }
Expand All @@ -139,4 +140,4 @@ _ = mgr.removeVolume("myvol", true)

---

**For more details, see the source code and comments in `serve_image.go
**For more details, see the source code and comments in `docker.go`.
Loading