diff --git a/src/api.go b/src/api.go index 566c74b..15b29d1 100644 --- a/src/api.go +++ b/src/api.go @@ -200,8 +200,9 @@ func (a *App) createJob(w http.ResponseWriter, r *http.Request) { http.Error(w, "Job type is required", http.StatusBadRequest) return } - if err := a.scheduler.Enqueue("jobType", "gpuType", payload); err != nil { - a.log.Error("enqueue failed", "err", err, "payload", payload) + jobID, err := a.scheduler.Enqueue(req.Type, req.RequiredGPU, req.Payload) + if err != nil { + a.log.Error("enqueue failed", "err", err, "payload", req.Payload) http.Error(w, "enqueue failed", http.StatusInternalServerError) return } @@ -325,75 +326,3 @@ func (a *App) getAllSupervisors(w http.ResponseWriter, r *http.Request) { return } } - -func (a *App) getSupervisorStatus(w http.ResponseWriter, r *http.Request) { - supervisors, err := a.statusRegistry.GetAllSupervisors() - if err != nil { - a.log.Error("failed to get supervisor status", "error", err) - http.Error(w, "failed to get supervisor status", http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(map[string]interface{}{ - "supervisors": supervisors, - "count": len(supervisors), - }); err != nil { - a.log.Error("failed to encode supervisor status response", "error", err) - http.Error(w, "failed to encode response", http.StatusInternalServerError) - return - } -} - -func (a *App) getSupervisorStatusByID(w http.ResponseWriter, r *http.Request) { - // extract consumer ID from URL path - path := strings.TrimPrefix(r.URL.Path, "/supervisors/status/") - if path == "" { - http.Error(w, "consumer ID required", http.StatusBadRequest) - return - } - - supervisor, err := a.statusRegistry.GetSupervisor(path) - if err != nil { - a.log.Error("failed to get supervisor status", "consumer_id", path, "error", err) - http.Error(w, "supervisor not found", http.StatusNotFound) - return - } - - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(supervisor); err != nil { - a.log.Error("failed to encode supervisor status response", "error", err) - http.Error(w, "failed to encode response", http.StatusInternalServerError) - return - } -} - -func (a *App) getAllSupervisors(w http.ResponseWriter, r *http.Request) { - activeOnly := r.URL.Query().Get("active") == "true" - - var supervisors []SupervisorStatus - var err error - - if activeOnly { - supervisors, err = a.statusRegistry.GetActiveSupervisors() - } else { - supervisors, err = a.statusRegistry.GetAllSupervisors() - } - - if err != nil { - a.log.Error("failed to get supervisors", "active_only", activeOnly, "error", err) - http.Error(w, "failed to get supervisors", http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(map[string]interface{}{ - "supervisors": supervisors, - "count": len(supervisors), - "active_only": activeOnly, - }); err != nil { - a.log.Error("failed to encode supervisors response", "error", err) - http.Error(w, "failed to encode response", http.StatusInternalServerError) - return - } -} diff --git a/src/container_job_test.go b/src/container_job_test.go new file mode 100644 index 0000000..cd3ed1e --- /dev/null +++ b/src/container_job_test.go @@ -0,0 +1,158 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "testing" + "time" + + "mist/docker" + "mist/multilogger" + + "github.com/docker/docker/client" + "github.com/redis/go-redis/v9" +) + +// TestContainerJobCPU verifies that when a CPU job is enqueued and a CPU supervisor +// picks it up, a container is actually started using the Docker API. +// Requires: Docker running, Redis running (docker-compose up), pytorch-cpu image built. +func TestContainerJobCPU(t *testing.T) { + // Skip if Docker is not available + dockerCli, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + t.Skipf("Docker not available, skipping: %v", err) + } + defer dockerCli.Close() + + ctx := context.Background() + if _, err := dockerCli.Ping(ctx); err != nil { + t.Skipf("Docker daemon not reachable, skipping: %v", err) + } + + // Ensure pytorch-cpu image exists + _, _, err = dockerCli.ImageInspectWithRaw(ctx, "pytorch-cpu") + if err != nil { + t.Skipf("pytorch-cpu image not found. Build it with: cd src/images/pytorch-cpu && docker build -t pytorch-cpu .") + } + + redisAddr := "localhost:6379" + redisClient := redis.NewClient(&redis.Options{Addr: redisAddr}) + defer redisClient.Close() + if err := redisClient.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not running at %s, skipping: %v (run: docker-compose up -d)", redisAddr, err) + } + + // Clean up any stale stream/consumer state for a clean test + redisClient.FlushDB(ctx) + + config, _ := multilogger.GetLogConfig() + schedulerLog, err := multilogger.CreateLogger("scheduler", &config) + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } + supervisorLog, err := multilogger.CreateLogger("supervisor", &config) + if err != nil { + t.Fatalf("failed to create logger: %v", err) + } + + scheduler := NewScheduler(redisAddr, schedulerLog) + defer scheduler.Close() + + consumerID := fmt.Sprintf("worker_cpu_test_%d", os.Getpid()) + supervisor := NewSupervisor(redisAddr, consumerID, "CPU", supervisorLog) + if err := supervisor.Start(); err != nil { + t.Fatalf("supervisor start failed: %v", err) + } + defer supervisor.Stop() + + // Enqueue a CPU job + payload := map[string]interface{}{ + "task": "test_task", + "data": "test_data", + } + jobID, err := scheduler.Enqueue("test_job", "CPU", payload) + if err != nil { + t.Fatalf("enqueue failed: %v", err) + } + slog.Info("enqueued CPU job", "job_id", jobID) + + // Wait for the job to be processed (supervisor runs container for ~2 sec then cleans up) + deadline := time.After(15 * time.Second) + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-deadline: + t.Fatal("timeout waiting for job to complete") + case <-ticker.C: + metadata, err := redisClient.HGetAll(ctx, "job:"+jobID).Result() + if err != nil { + continue + } + jobState := metadata["job_state"] + if jobState == string(JobStateSuccess) { + t.Logf("job completed successfully: %s", jobID) + return + } + if jobState == string(JobStateFailure) { + t.Fatalf("job failed: %s", jobID) + } + } + } +} + +// TestRunContainerCPU is a unit test in the images package that verifies +// running a CPU container works. We run it here via the images package. +func TestRunContainerCPUIntegration(t *testing.T) { + dockerCli, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + t.Skipf("Docker not available: %v", err) + } + defer dockerCli.Close() + + ctx := context.Background() + if _, err := dockerCli.Ping(ctx); err != nil { + t.Skipf("Docker daemon not reachable: %v", err) + } + + // Check if pytorch-cpu image exists + _, _, err = dockerCli.ImageInspectWithRaw(ctx, "pytorch-cpu") + if err != nil { + t.Skipf("pytorch-cpu image not found. Build with: cd src/images/pytorch-cpu && docker build -t pytorch-cpu .") + } + + mgr := docker.NewDockerMgr(dockerCli, 10, 100) + + volName := "test_cpu_job_vol" + vol, err := mgr.CreateVolume(volName) + if err != nil { + t.Fatalf("create volume: %v", err) + } + if vol.Name != volName { + t.Errorf("volume name: got %s want %s", vol.Name, volName) + } + defer mgr.RemoveVolume(volName, true) + + containerID, err := mgr.RunContainer("pytorch-cpu", "runc", volName) + if err != nil { + t.Fatalf("run container: %v", err) + } + defer func() { + _ = mgr.StopContainer(containerID) + _ = mgr.RemoveContainer(containerID) + }() + + // Verify container is running + inspect, err := dockerCli.ContainerInspect(ctx, containerID) + if err != nil { + t.Fatalf("Failed to inspect container: %v", err) + } + if inspect.State.Status != "running" { + t.Errorf("container not running: status=%s", inspect.State.Status) + } + + t.Logf("CPU container started successfully: %s", containerID[:12]) +} \ No newline at end of file diff --git a/src/images/README.md b/src/docker/README.md similarity index 75% rename from src/images/README.md rename to src/docker/README.md index 4535422..2e6c3e5 100644 --- a/src/images/README.md +++ b/src/docker/README.md @@ -12,7 +12,8 @@ docker run -it --rm --gpus all bash ``` ## Test ``` -sh run_tests.sh +run_tests.sh +run_container_job_test.sh ``` ## Troubleshooting @@ -40,13 +41,13 @@ sh run_tests.sh ## Overview -The `mist/images` package provides a `ContainerMgr` struct and related methods for managing Docker containers and volumes programmatically using the Docker Go SDK. It enforces limits on the number of containers and volumes, and provides safe creation, deletion, and lifecycle management. +The `mist/images` package provides a `DockerMgr` struct and related methods for managing Docker containers and volumes programmatically using the Docker Go SDK. It enforces limits on the number of containers and volumes, and provides safe creation, deletion, and lifecycle management. --- ## Main APIs -### `type ContainerMgr struct` +### `type DockerMgr struct` Manages Docker containers and volumes, enforces resource limits, and tracks active resources. **Fields:** @@ -60,37 +61,37 @@ Manages Docker containers and volumes, enforces resource limits, and tracks acti --- -### `func NewContainerMgr(client *client.Client, containerLimit, volumeLimit int) *ContainerMgr` -Creates a new `ContainerMgr` with the specified Docker client and resource limits. +### `func NewDockerMgr(client *client.Client, containerLimit, volumeLimit int) *DockerMgr` +Creates a new `DockerMgr` with the specified Docker client and resource limits. --- -### `func (mgr *ContainerMgr) createVolume(volumeName string) (volume.Volume, error)` +### `func (mgr *DockerMgr) createVolume(volumeName string) (volume.Volume, error)` Creates a Docker volume with the given name, enforcing the volume limit. Returns the created volume or an error. --- -### `func (mgr *ContainerMgr) removeVolume(volumeName string, force bool) error` +### `func (mgr *DockerMgr) removeVolume(volumeName string, force bool) error` Removes a Docker volume by name. Returns an error if the volume does not exist or is in use (unless `force` is true). --- -### `func (mgr *ContainerMgr) runContainer(imageName string, runtimeName string, volumeName string) (string, error)` +### `func (mgr *DockerMgr) runContainer(imageName string, runtimeName string, volumeName string) (string, error)` Creates and starts a container with the specified image, runtime, and volume attached at `/data`. Enforces the container limit. Returns the container ID or an error. --- -### `func (mgr *ContainerMgr) stopContainer(containerID string) error` +### `func (mgr *DockerMgr) stopContainer(containerID string) error` Stops a running container by ID. Returns an error if the operation fails. --- -### `func (mgr *ContainerMgr) removeContainer(containerID string) error` +### `func (mgr *DockerMgr) removeContainer(containerID string) error` Removes a container by ID and deletes it from the internal tracking map. Returns an error if the operation fails. @@ -98,7 +99,7 @@ Returns an error if the operation fails. ## Test Plan -The test suite (`serve_image_test.go`) covers the following scenarios: +The test suite (`docker_test.go`) covers the following scenarios: - Create a volume, check it exists, delete it, check it no longer exists. - Create a volume with the same name twice (should not fail). @@ -116,7 +117,7 @@ The test suite (`serve_image_test.go`) covers the following scenarios: ```go cli, _ := client.NewClientWithOpts(client.FromEnv) -mgr := NewContainerMgr(cli, 10, 100) +mgr := NewDockerMgr(cli, 10, 100) vol, err := mgr.createVolume("myvol") if err != nil { /* handle error */ } @@ -139,4 +140,4 @@ _ = mgr.removeVolume("myvol", true) --- -**For more details, see the source code and comments in `serve_image.go \ No newline at end of file +**For more details, see the source code and comments in `docker.go`. \ No newline at end of file diff --git a/src/images/serve_image.go b/src/docker/docker.go similarity index 75% rename from src/images/serve_image.go rename to src/docker/docker.go index b264582..b8c19b9 100644 --- a/src/images/serve_image.go +++ b/src/docker/docker.go @@ -1,11 +1,9 @@ -package main +package docker import ( "context" "fmt" - "io" "log/slog" - "os" "sync" "github.com/docker/docker/api/types/container" @@ -14,8 +12,8 @@ import ( "github.com/docker/docker/client" ) -// ContainerMgr manages Docker containers and volumes, enforces resource limits, and tracks active resources. -type ContainerMgr struct { +// DockerMgr manages Docker containers and volumes, enforces resource limits, and tracks active resources. +type DockerMgr struct { ctx context.Context cli *client.Client containerLimit int @@ -25,9 +23,9 @@ type ContainerMgr struct { mu sync.Mutex } -// NewContainerMgr creates a new ContainerMgr with the specified Docker client and resource limits. -func NewContainerMgr(client *client.Client, containerLimit, volumeLimit int) *ContainerMgr { - return &ContainerMgr{ +// NewDockerMgr creates a new DockerMgr with the specified Docker client and resource limits. +func NewDockerMgr(client *client.Client, containerLimit, volumeLimit int) *DockerMgr { + return &DockerMgr{ ctx: context.Background(), cli: client, containerLimit: containerLimit, @@ -37,9 +35,9 @@ func NewContainerMgr(client *client.Client, containerLimit, volumeLimit int) *Co } } -// stopContainer stops a running container by its ID. +// StopContainer stops a running container by its ID. // Returns an error if the operation fails. -func (mgr *ContainerMgr) stopContainer(containerID string) error { +func (mgr *DockerMgr) StopContainer(containerID string) error { ctx := mgr.ctx cli := mgr.cli @@ -51,9 +49,9 @@ func (mgr *ContainerMgr) stopContainer(containerID string) error { return nil } -// removeContainer removes a container by its ID and deletes it from the internal tracking map. +// RemoveContainer removes a container by its ID and deletes it from the internal tracking map. // Returns an error if the operation fails. -func (mgr *ContainerMgr) removeContainer(containerID string) error { +func (mgr *DockerMgr) RemoveContainer(containerID string) error { mgr.mu.Lock() defer mgr.mu.Unlock() ctx := mgr.ctx @@ -67,9 +65,9 @@ func (mgr *ContainerMgr) removeContainer(containerID string) error { return nil } -// createVolume creates a Docker volume with the given name, enforcing the volume limit. +// CreateVolume creates a Docker volume with the given name, enforcing the volume limit. // Returns the created volume or an error. -func (mgr *ContainerMgr) createVolume(volumeName string) (volume.Volume, error) { +func (mgr *DockerMgr) CreateVolume(volumeName string) (volume.Volume, error) { mgr.mu.Lock() defer mgr.mu.Unlock() if len(mgr.volumes) >= mgr.volumeLimit { @@ -88,9 +86,9 @@ func (mgr *ContainerMgr) createVolume(volumeName string) (volume.Volume, error) return vol, nil } -// removeVolume removes a Docker volume by name. +// RemoveVolume removes a Docker volume by name. // Returns an error if the volume does not exist or is in use (unless force is true). -func (mgr *ContainerMgr) removeVolume(volumeName string, force bool) error { +func (mgr *DockerMgr) RemoveVolume(volumeName string, force bool) error { mgr.mu.Lock() defer mgr.mu.Unlock() ctx := mgr.ctx @@ -118,10 +116,10 @@ func (mgr *ContainerMgr) removeVolume(volumeName string, force bool) error { return nil } -// runContainer creates and starts a container with the specified image, runtime, and volume attached at /data. +// RunContainer creates and starts a container with the specified image, runtime, and volume attached at /data. // Enforces the container limit and checks that the volume exists. // Returns the container ID or an error. -func (mgr *ContainerMgr) runContainer(imageName string, runtimeName string, volumeName string) (string, error) { +func (mgr *DockerMgr) RunContainer(imageName string, runtimeName string, volumeName string) (string, error) { mgr.mu.Lock() defer mgr.mu.Unlock() if len(mgr.containers) >= mgr.containerLimit { @@ -176,12 +174,5 @@ func (mgr *ContainerMgr) runContainer(imageName string, runtimeName string, volu return "", err } - out, err := cli.ContainerLogs(ctx, resp.ID, container.LogsOptions{ShowStdout: true}) - if err != nil { - slog.Error("Failed to get container logs", "containerID", resp.ID, "error", err) - return resp.ID, err - } - - io.Copy(os.Stdout, out) return resp.ID, nil } diff --git a/src/images/serve_image_test.go b/src/docker/docker_test.go similarity index 64% rename from src/images/serve_image_test.go rename to src/docker/docker_test.go index 9baa0f0..63f897d 100644 --- a/src/images/serve_image_test.go +++ b/src/docker/docker_test.go @@ -1,4 +1,4 @@ -package main +package docker import ( "fmt" @@ -8,19 +8,28 @@ import ( "github.com/docker/docker/client" ) -func setupMgr(t *testing.T) *ContainerMgr { +func setupMgr(t *testing.T) *DockerMgr { cli, err := client.NewClientWithOpts(client.FromEnv) if err != nil { t.Fatalf("Failed to create Docker client: %v", err) } - return NewContainerMgr(cli, 10, 100) + return NewDockerMgr(cli, 10, 100) +} + +// cpuImageAndRuntime returns pytorch-cpu and runc for CPU-only tests. Skips if image not found. +func cpuImageAndRuntime(t *testing.T, mgr *DockerMgr) (imageName, runtimeName string) { + _, _, err := mgr.cli.ImageInspectWithRaw(mgr.ctx, "pytorch-cpu") + if err != nil { + t.Skipf("pytorch-cpu image not found. Build with: cd pytorch-cpu && docker build -t pytorch-cpu .") + } + return "pytorch-cpu", "runc" } // Create a volume, check exists, delete, check not exists func TestCreateDeleteVolume(t *testing.T) { mgr := setupMgr(t) volName := "test_volume_t1" - _, err := mgr.createVolume(volName) + _, err := mgr.CreateVolume(volName) if err != nil { t.Errorf("Failed to create volume %s: %v", volName, err) } @@ -35,7 +44,7 @@ func TestCreateDeleteVolume(t *testing.T) { if !found { t.Errorf("Volume %s not found after creation", volName) } - err = mgr.removeVolume(volName, true) + err = mgr.RemoveVolume(volName, true) if err != nil { t.Errorf("Failed to remove volume %s: %v", volName, err) } @@ -51,12 +60,12 @@ func TestCreateDeleteVolume(t *testing.T) { func TestCreateVolumeTwice(t *testing.T) { mgr := setupMgr(t) volName := "test_volume_t3" - _, err := mgr.createVolume(volName) + _, err := mgr.CreateVolume(volName) if err != nil { t.Errorf("Failed to create volume %s: %v", volName, err) } - defer mgr.removeVolume(volName, true) - _, err = mgr.createVolume(volName) + defer mgr.RemoveVolume(volName, true) + _, err = mgr.CreateVolume(volName) if err != nil { t.Errorf("Failed to create volume %s a second time: %v", volName, err) } @@ -65,7 +74,7 @@ func TestCreateVolumeTwice(t *testing.T) { // Remove volume that doesn't exist (should fail or panic) func TestRemoveNonexistentVolume(t *testing.T) { mgr := setupMgr(t) - err := mgr.removeVolume("nonexistent_volume_t4", true) + err := mgr.RemoveVolume("nonexistent_volume_t4", true) if err == nil { t.Errorf("Expected error when removing nonexistent volume, but no error") } else { @@ -73,39 +82,60 @@ func TestRemoveNonexistentVolume(t *testing.T) { } } +// TestRunContainerCPU verifies running a CPU container (pytorch-cpu + runc). +// Use this on machines without GPU. Requires: docker build -t pytorch-cpu ./pytorch-cpu +func TestRunContainerCPU(t *testing.T) { + mgr := setupMgr(t) + imageName, runtimeName := cpuImageAndRuntime(t, mgr) + volName := "test_volume_cpu" + _, err := mgr.CreateVolume(volName) + if err != nil { + t.Fatalf("Failed to create volume %s: %v", volName, err) + } + defer mgr.RemoveVolume(volName, true) + containerID, err := mgr.RunContainer(imageName, runtimeName, volName) + if err != nil { + t.Fatalf("Failed to start CPU container: %v", err) + } + defer func() { + _ = mgr.StopContainer(containerID) + _ = mgr.RemoveContainer(containerID) + }() + t.Logf("CPU container started: %s", containerID[:12]) +} + // Remove volume in use (should fail or panic) func TestRemoveVolumeInUse(t *testing.T) { mgr := setupMgr(t) - imageName := "pytorch-cuda" - runtimeName := "nvidia" + imageName, runtimeName := cpuImageAndRuntime(t, mgr) volName := "test_volume_t5" - _, err := mgr.createVolume(volName) + _, err := mgr.CreateVolume(volName) if err != nil { t.Fatalf("Failed to create volume %s: %v", volName, err) } - containerID, err := mgr.runContainer(imageName, runtimeName, volName) + containerID, err := mgr.RunContainer(imageName, runtimeName, volName) if err != nil { t.Fatalf("Failed to start container: %v", err) } defer func() { // Cleanup: stop and remove container, then remove volume - if err := mgr.stopContainer(containerID); err != nil { + if err := mgr.StopContainer(containerID); err != nil { t.Logf("Cleanup: failed to stop container %s: %v", containerID, err) } else { t.Logf("Cleanup: stopped container %s successfully", containerID) } - if err := mgr.removeContainer(containerID); err != nil { + if err := mgr.RemoveContainer(containerID); err != nil { t.Logf("Cleanup: failed to remove container %s: %v", containerID, err) } else { t.Logf("Cleanup: removed container %s successfully", containerID) } - if err := mgr.removeVolume(volName, true); err != nil { + if err := mgr.RemoveVolume(volName, true); err != nil { t.Logf("Cleanup: failed to remove volume %s: %v", volName, err) } else { t.Logf("Cleanup: removed volume %s successfully", volName) } }() - err = mgr.removeVolume(volName, true) // Should error: volume is in use by a running container + err = mgr.RemoveVolume(volName, true) // Should error: volume is in use by a running container if err == nil { t.Errorf("Expected error when removing volume in use, but no error") } else { @@ -116,10 +146,9 @@ func TestRemoveVolumeInUse(t *testing.T) { // Attach a volume that does not exist (should fail or panic) func TestAttachNonexistentVolume(t *testing.T) { mgr := setupMgr(t) - imageName := "pytorch-cuda" - runtimeName := "nvidia" + imageName, runtimeName := cpuImageAndRuntime(t, mgr) volName := "nonexistent_volume_t6" - id, err := mgr.runContainer(imageName, runtimeName, volName) + id, err := mgr.RunContainer(imageName, runtimeName, volName) // If Docker auto-creates the volume, this may not error; check your policy if id != "" && err != nil { t.Errorf("Expected error when attaching nonexistent volume, but got id=%v, err=%v", id, err) @@ -131,35 +160,34 @@ func TestAttachNonexistentVolume(t *testing.T) { // Two containers attach to the same volume (should succeed in Docker, but test for your policy) func TestTwoContainersSameVolume(t *testing.T) { mgr := setupMgr(t) - imageName := "pytorch-cuda" - runtimeName := "nvidia" + imageName, runtimeName := cpuImageAndRuntime(t, mgr) volName := "test_volume_t7" - _, err := mgr.createVolume(volName) + _, err := mgr.CreateVolume(volName) if err != nil { t.Fatalf("Failed to create volume %s: %v", volName, err) } - id1, err := mgr.runContainer(imageName, runtimeName, volName) + id1, err := mgr.RunContainer(imageName, runtimeName, volName) if err != nil { t.Fatalf("Failed to start first container: %v", err) } - id2, err := mgr.runContainer(imageName, runtimeName, volName) + id2, err := mgr.RunContainer(imageName, runtimeName, volName) if err != nil { t.Fatalf("Failed to start second container: %v", err) } // Both containers should be able to use the same volume - if err := mgr.stopContainer(id1); err != nil { + if err := mgr.StopContainer(id1); err != nil { t.Logf("Failed to stop first container: %v", err) } - if err := mgr.removeContainer(id1); err != nil { + if err := mgr.RemoveContainer(id1); err != nil { t.Logf("Failed to remove first container: %v", err) } - if err := mgr.stopContainer(id2); err != nil { + if err := mgr.StopContainer(id2); err != nil { t.Logf("Failed to stop second container: %v", err) } - if err := mgr.removeContainer(id2); err != nil { + if err := mgr.RemoveContainer(id2); err != nil { t.Logf("Failed to remove second container: %v", err) } - if err := mgr.removeVolume(volName, true); err != nil { + if err := mgr.RemoveVolume(volName, true); err != nil { t.Logf("Failed to remove volume %s: %v", volName, err) } } @@ -167,35 +195,34 @@ func TestTwoContainersSameVolume(t *testing.T) { // Two containers try to attach to the same volume at the same time (should succeed in Docker) func TestTwoContainersSameVolumeConcurrent(t *testing.T) { mgr := setupMgr(t) - imageName := "pytorch-cuda" - runtimeName := "nvidia" + imageName, runtimeName := cpuImageAndRuntime(t, mgr) volName := "test_volume_t8" - _, err := mgr.createVolume(volName) + _, err := mgr.CreateVolume(volName) if err != nil { t.Fatalf("Failed to create volume %s: %v", volName, err) } - id1, err := mgr.runContainer(imageName, runtimeName, volName) + id1, err := mgr.RunContainer(imageName, runtimeName, volName) if err != nil { t.Fatalf("Failed to start first container: %v", err) } - id2, err2 := mgr.runContainer(imageName, runtimeName, volName) + id2, err2 := mgr.RunContainer(imageName, runtimeName, volName) if err2 != nil { t.Fatalf("Failed to start second container: %v", err2) } // This test does not actually run containers concurrently, but checks Docker's shared volume support - if err := mgr.stopContainer(id1); err != nil { + if err := mgr.StopContainer(id1); err != nil { t.Logf("Failed to stop first container: %v", err) } - if err := mgr.removeContainer(id1); err != nil { + if err := mgr.RemoveContainer(id1); err != nil { t.Logf("Failed to remove first container: %v", err) } - if err := mgr.stopContainer(id2); err != nil { + if err := mgr.StopContainer(id2); err != nil { t.Logf("Failed to stop second container: %v", err) } - if err := mgr.removeContainer(id2); err != nil { + if err := mgr.RemoveContainer(id2); err != nil { t.Logf("Failed to remove second container: %v", err) } - if err := mgr.removeVolume(volName, true); err != nil { + if err := mgr.RemoveVolume(volName, true); err != nil { t.Logf("Failed to remove volume %s: %v", volName, err) } } @@ -207,14 +234,14 @@ func TestVolumeLimit(t *testing.T) { created := []string{} for i := 0; i < limit; i++ { name := "test_volume_t9_" + fmt.Sprint(i) - _, err := mgr.createVolume(name) + _, err := mgr.CreateVolume(name) if err != nil { t.Fatalf("Failed to create volume %s: %v", name, err) } created = append(created, name) } name := "test_volume_fail" - _, err := mgr.createVolume(name) + _, err := mgr.CreateVolume(name) if err == nil { t.Errorf("Volume limit not enforced") } else { @@ -224,7 +251,7 @@ func TestVolumeLimit(t *testing.T) { defer func() { // Cleanup: remove all created volumes for _, name := range created { - if err := mgr.removeVolume(name, true); err != nil { + if err := mgr.RemoveVolume(name, true); err != nil { t.Logf("Cleanup: failed to remove volume %s: %v", name, err) } } @@ -235,23 +262,22 @@ func TestVolumeLimit(t *testing.T) { // Set a limit of 10 containers (should fail on 11th if you enforce a limit) func TestContainerLimit(t *testing.T) { mgr := setupMgr(t) - imageName := "pytorch-cuda" - runtimeName := "nvidia" + imageName, runtimeName := cpuImageAndRuntime(t, mgr) volName := "test_volume_t10" - _, err := mgr.createVolume(volName) + _, err := mgr.CreateVolume(volName) if err != nil { t.Fatalf("Failed to create volume %s: %v", volName, err) } ids := []string{} limit := 10 for i := 0; i < limit; i++ { - id, err := mgr.runContainer(imageName, runtimeName, volName) + id, err := mgr.RunContainer(imageName, runtimeName, volName) if err != nil { t.Fatalf("Failed to start container %d: %v", i, err) } ids = append(ids, id) } - _, err = mgr.runContainer(imageName, runtimeName, volName) + _, err = mgr.RunContainer(imageName, runtimeName, volName) if err == nil { t.Errorf("Container limit not enforced") } else { @@ -260,14 +286,14 @@ func TestContainerLimit(t *testing.T) { defer func() { // Cleanup: stop and remove all containers, then remove the volume for _, id := range ids { - if err := mgr.stopContainer(id); err != nil { + if err := mgr.StopContainer(id); err != nil { t.Logf("Cleanup: failed to stop container %s: %v", id, err) } - if err := mgr.removeContainer(id); err != nil { + if err := mgr.RemoveContainer(id); err != nil { t.Logf("Cleanup: failed to remove container %s: %v", id, err) } } - if err := mgr.removeVolume(volName, true); err != nil { + if err := mgr.RemoveVolume(volName, true); err != nil { t.Logf("Cleanup: failed to remove volume %s: %v", volName, err) } }() diff --git a/src/images/go.mod b/src/docker/go.mod similarity index 98% rename from src/images/go.mod rename to src/docker/go.mod index 213ae47..4a29fc2 100644 --- a/src/images/go.mod +++ b/src/docker/go.mod @@ -1,4 +1,4 @@ -module mist/images +module mist/docker go 1.24.6 diff --git a/src/images/go.sum b/src/docker/go.sum similarity index 100% rename from src/images/go.sum rename to src/docker/go.sum diff --git a/src/images/pytorch-cpu/dockerfile b/src/docker/pytorch-cpu/dockerfile similarity index 100% rename from src/images/pytorch-cpu/dockerfile rename to src/docker/pytorch-cpu/dockerfile diff --git a/src/images/pytorch-cpu/pytorch-cpu_config.yaml b/src/docker/pytorch-cpu/pytorch-cpu_config.yaml similarity index 100% rename from src/images/pytorch-cpu/pytorch-cpu_config.yaml rename to src/docker/pytorch-cpu/pytorch-cpu_config.yaml diff --git a/src/images/pytorch-cpu/requirements.txt b/src/docker/pytorch-cpu/requirements.txt similarity index 100% rename from src/images/pytorch-cpu/requirements.txt rename to src/docker/pytorch-cpu/requirements.txt diff --git a/src/images/pytorch-cpu/run_test.sh b/src/docker/pytorch-cpu/run_test.sh similarity index 100% rename from src/images/pytorch-cpu/run_test.sh rename to src/docker/pytorch-cpu/run_test.sh diff --git a/src/images/pytorch-cuda/dockerfile b/src/docker/pytorch-cuda/dockerfile similarity index 100% rename from src/images/pytorch-cuda/dockerfile rename to src/docker/pytorch-cuda/dockerfile diff --git a/src/images/pytorch-cuda/pytorch-cuda_config.yaml b/src/docker/pytorch-cuda/pytorch-cuda_config.yaml similarity index 100% rename from src/images/pytorch-cuda/pytorch-cuda_config.yaml rename to src/docker/pytorch-cuda/pytorch-cuda_config.yaml diff --git a/src/images/pytorch-cuda/requirements.txt b/src/docker/pytorch-cuda/requirements.txt similarity index 100% rename from src/images/pytorch-cuda/requirements.txt rename to src/docker/pytorch-cuda/requirements.txt diff --git a/src/images/pytorch-cuda/run_test.sh b/src/docker/pytorch-cuda/run_test.sh similarity index 100% rename from src/images/pytorch-cuda/run_test.sh rename to src/docker/pytorch-cuda/run_test.sh diff --git a/src/images/pytorch-rocm/dockerfile b/src/docker/pytorch-rocm/dockerfile similarity index 100% rename from src/images/pytorch-rocm/dockerfile rename to src/docker/pytorch-rocm/dockerfile diff --git a/src/images/pytorch-rocm/pytorch-rocm_config.yaml b/src/docker/pytorch-rocm/pytorch-rocm_config.yaml similarity index 100% rename from src/images/pytorch-rocm/pytorch-rocm_config.yaml rename to src/docker/pytorch-rocm/pytorch-rocm_config.yaml diff --git a/src/images/pytorch-rocm/requirements.txt b/src/docker/pytorch-rocm/requirements.txt similarity index 100% rename from src/images/pytorch-rocm/requirements.txt rename to src/docker/pytorch-rocm/requirements.txt diff --git a/src/images/pytorch-rocm/run_test.sh b/src/docker/pytorch-rocm/run_test.sh similarity index 100% rename from src/images/pytorch-rocm/run_test.sh rename to src/docker/pytorch-rocm/run_test.sh diff --git a/src/docker/run_container_job_test.sh b/src/docker/run_container_job_test.sh new file mode 100755 index 0000000..36ef5dc --- /dev/null +++ b/src/docker/run_container_job_test.sh @@ -0,0 +1,23 @@ +#!/bin/bash +# Run CPU-only container tests. Requires Docker and Redis. +# Usage: ./run_container_job_test.sh + +set -e +cd "$(dirname "$0")/.." + +echo "Building pytorch-cpu image..." +docker build -t pytorch-cpu ./images/pytorch-cpu || { + echo "Failed to build pytorch-cpu. Ensure Docker is running." + exit 1 +} + +echo "Starting Redis..." +docker compose up -d redis 2>/dev/null || docker-compose up -d redis 2>/dev/null || true +sleep 2 + +echo "Running container job tests..." +go test -v -run 'TestContainerJobCPU|TestRunContainerCPUIntegration' -count=1 + +go test -v -run TestRunContainerCPU -count=1 + +echo "All container job tests passed." diff --git a/src/images/run_tests.sh b/src/docker/run_tests.sh similarity index 100% rename from src/images/run_tests.sh rename to src/docker/run_tests.sh diff --git a/src/go.mod b/src/go.mod index 097afa1..5092ffc 100644 --- a/src/go.mod +++ b/src/go.mod @@ -1,12 +1,46 @@ module mist -go 1.24.3 +go 1.24.6 -require github.com/redis/go-redis/v9 v9.10.0 +require ( + github.com/docker/docker v28.5.2+incompatible + github.com/redis/go-redis/v9 v9.10.0 + gopkg.in/natefinch/lumberjack.v2 v2.2.1 + gopkg.in/yaml.v3 v3.0.1 + mist/docker v0.0.0 +) + +replace mist/docker => ./docker require ( + github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect + github.com/Microsoft/go-winio v0.4.21 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/containerd/errdefs v1.0.0 // indirect + github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/go-connections v0.6.0 // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect + github.com/moby/sys/sequential v0.6.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect + go.opentelemetry.io/otel v1.40.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect + go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/sdk v1.40.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.40.0 // indirect + go.opentelemetry.io/otel/trace v1.40.0 // indirect + go.opentelemetry.io/proto/otlp v1.9.0 // indirect + golang.org/x/net v0.50.0 // indirect + golang.org/x/sys v0.41.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect ) diff --git a/src/go.sum b/src/go.sum index 0cdd507..774c291 100644 --- a/src/go.sum +++ b/src/go.sum @@ -1,15 +1,122 @@ +github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c h1:udKWzYgxTojEKWjV8V+WSxDXJ4NFATAsZjh8iIbsQIg= +github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/Microsoft/go-winio v0.4.21 h1:+6mVbXh4wPzUrl1COX9A+ZCvEpYsOBZ6/+kwDnvLyro= +github.com/Microsoft/go-winio v0.4.21/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= +github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= +github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= +github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= +github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= +github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/docker v28.5.2+incompatible h1:DBX0Y0zAjZbSrm1uzOkdr1onVghKaftjlSWt4AFexzM= +github.com/docker/docker v28.5.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.6.0 h1:LlMG9azAe1TqfR7sO+NJttz1gy6KO7VJBh+pMmjSD94= +github.com/docker/go-connections v0.6.0/go.mod h1:AahvXYshr6JgfUJGdDCs2b5EZG/vmaMAntpSFH5BFKE= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= +github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw= +github.com/moby/sys/atomicwriter v0.1.0/go.mod h1:Ul8oqv2ZMNHOceF643P6FKPXeCmYtlQMvpizfsSoaWs= +github.com/moby/sys/sequential v0.6.0 h1:qrx7XFUd/5DxtqcoH1h438hF5TmOvzC/lspjy7zgvCU= +github.com/moby/sys/sequential v0.6.0/go.mod h1:uyv8EUTrca5PnDsdMGXhZe6CCe8U/UiTWd+lL+7b/Ko= +github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ= +github.com/moby/term v0.5.2/go.mod h1:d3djjFCrjnB+fl8NJux+EJzu0msscUP+f8it8hPkFLc= +github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= +github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= +github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/redis/go-redis/v9 v9.10.0 h1:FxwK3eV8p/CQa0Ch276C7u2d0eNC9kCmAYQ7mCXCzVs= github.com/redis/go-redis/v9 v9.10.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/sirupsen/logrus v1.7.0 h1:ShrD1U9pZB12TX0cVy0DtePoCH97K8EtX+mg7ZARUtM= +github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 h1:RbKq8BG0FI8OiXhBfcRtqqHcZcka+gU3cskNuf05R18= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0/go.mod h1:h06DGIukJOevXaj/xrNjhi/2098RZzcLTbc0jDAUbsg= +go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= +go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0 h1:aTL7F04bJHUlztTsNGJ2l+6he8c+y/b//eR0jjjemT4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.38.0/go.mod h1:kldtb7jDTeol0l3ewcmd8SDvx3EmIE7lyvqbasU3QC4= +go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= +go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= +go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= +go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= +go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= +golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= +golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE= +golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY= +google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= +google.golang.org/grpc v1.75.1 h1:/ODCNEuf9VghjgO3rqLcfg8fiOP0nSluljWFlDxELLI= +google.golang.org/grpc v1.75.1/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc= gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= +gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= diff --git a/src/int_test.go b/src/int_test.go index 851a24f..3552145 100644 --- a/src/int_test.go +++ b/src/int_test.go @@ -100,7 +100,7 @@ func TestIntegration(t *testing.T) { "data": fmt.Sprintf("test_data_%d", i), } - if err := scheduler.Enqueue(jobType, "TT", payload); err != nil { + if _, err := scheduler.Enqueue(jobType, "TT", payload); err != nil { t.Errorf("Failed to enqueue job: %v", err) } } diff --git a/src/job_scheduling_test.go b/src/job_scheduling_test.go index 62c165e..b7271f1 100644 --- a/src/job_scheduling_test.go +++ b/src/job_scheduling_test.go @@ -36,7 +36,7 @@ func TestJobEnqueueAndSupervisor(t *testing.T) { // Enqueue jobs for i := 0; i < 3; i++ { payload := map[string]interface{}{"task": i} - if err := scheduler.Enqueue("test_job_type", "AMD", payload); err != nil { + if _, err := scheduler.Enqueue("test_job_type", "AMD", payload); err != nil { t.Errorf("Failed to enqueue job %d: %v", i, err) } } diff --git a/src/scheduler.go b/src/scheduler.go index a9d8c7a..f7c8556 100644 --- a/src/scheduler.go +++ b/src/scheduler.go @@ -29,7 +29,7 @@ func NewScheduler(redisAddr string, log *slog.Logger) *Scheduler { } } -func (s *Scheduler) Enqueue(jobType string, requiredGPU string, payload map[string]interface{}) error { +func (s *Scheduler) Enqueue(jobType string, requiredGPU string, payload map[string]interface{}) (string, error) { // create a new job job := Job{ ID: generateJobID(), @@ -42,17 +42,17 @@ func (s *Scheduler) Enqueue(jobType string, requiredGPU string, payload map[stri } if ok, err := s.JobExists(job.ID); err != nil { - return err + return "", err } else if ok { s.log.Warn("duplicate job skipped", "job_id", job.ID) - return nil + return job.ID, nil } // marshal the payload payloadJSON, err := json.Marshal(job.Payload) if err != nil { s.log.Error("failed to marshal job payload", "error", err) - return err + return "", err } // start redis pipeline @@ -81,7 +81,7 @@ func (s *Scheduler) Enqueue(jobType string, requiredGPU string, payload map[stri // execute pipeline if _, err := pipe.Exec(s.ctx); err != nil { s.log.Error("failed to enqueue job", "error", err) - return err + return "", err } s.log.Info("enqueued job", "job_id", job.ID, "job_type", job.Type, "gpu", requiredGPU) diff --git a/src/supervisor.go b/src/supervisor.go index aa93e82..96da66a 100644 --- a/src/supervisor.go +++ b/src/supervisor.go @@ -6,37 +6,57 @@ import ( "errors" "fmt" "log/slog" + "strconv" "sync" "time" - "strconv" + "mist/docker" + + "github.com/docker/docker/client" "github.com/redis/go-redis/v9" ) +// CPUImage and CPURuntime are used when running CPU-only jobs (no GPU). +const ( + CPUImage = "pytorch-cpu" + CPURuntime = "runc" +) + type Supervisor struct { - redisClient *redis.Client - ctx context.Context - cancel context.CancelFunc - consumerID string - gpuType string - wg sync.WaitGroup - log *slog.Logger + redisClient *redis.Client + ctx context.Context + cancel context.CancelFunc + consumerID string + gpuType string + dockerMgr *docker.DockerMgr + wg sync.WaitGroup + log *slog.Logger } func NewSupervisor(redisAddr, consumerID, gpuType string, log *slog.Logger) *Supervisor { - client := redis.NewClient(&redis.Options{ + redisClient := redis.NewClient(&redis.Options{ Addr: redisAddr, }) ctx, cancel := context.WithCancel(context.Background()) + var dockerMgr *docker.DockerMgr + dockerCli, err := client.NewClientWithOpts(client.FromEnv) + if err != nil { + log.Warn("Docker client unavailable, containers will not be started", "error", err) + } else { + dockerMgr = docker.NewDockerMgr(dockerCli, 10, 100) + log.Info("Docker client initialized for container execution") + } + return &Supervisor{ - redisClient: client, - ctx: ctx, - cancel: cancel, - consumerID: consumerID, - gpuType: gpuType, - log: log, + redisClient: redisClient, + ctx: ctx, + cancel: cancel, + consumerID: consumerID, + gpuType: gpuType, + dockerMgr: dockerMgr, + log: log, } } @@ -164,15 +184,18 @@ func (s *Supervisor) handleMessage(message redis.XMessage) { s.emitJobEvent(job.ID, JobStateInProgress) - // Simulate job processing success := s.processJob(job) if success { + s.emitJobEvent(job.ID, JobStateSuccess) + s.updateJobState(job.ID, JobStateSuccess) s.ackMessage(message.ID) s.log.Info("job completed successfully", "job_id", job.ID) } else { + s.emitJobEvent(job.ID, JobStateFailure) + s.updateJobState(job.ID, JobStateFailure) + s.ackMessage(message.ID) s.log.Error("job failed", "job_id", job.ID) - s.ackMessage(message.ID) // TODO: change this once we have docker support } } @@ -187,11 +210,61 @@ func (s *Supervisor) canHandleJob(job Job) bool { return job.RequiredGPU == s.gpuType } -// TODO: Actually schedule a container here +// processJob executes the job by starting a container. For CPU jobs only (no GPU). +// Returns true if the job completed successfully. func (s *Supervisor) processJob(job Job) bool { + // Only run CPU containers on this machine (no GPU support) + if !s.isCPUJob(job) { + s.log.Info("skipping container start for GPU job on CPU-only machine", "job_id", job.ID) + return true // Ack without running - let GPU supervisor handle + } + + if s.dockerMgr == nil { + s.log.Warn("no container manager, simulating job success", "job_id", job.ID) + return true + } + + volumeName := fmt.Sprintf("job_%s_data", job.ID) + _, err := s.dockerMgr.CreateVolume(volumeName) + if err != nil { + s.log.Error("failed to create volume for job", "job_id", job.ID, "error", err) + return false + } + + containerID, err := s.dockerMgr.RunContainer(CPUImage, CPURuntime, volumeName) + if err != nil { + s.log.Error("failed to run container for job", "job_id", job.ID, "error", err) + _ = s.dockerMgr.RemoveVolume(volumeName, true) + return false + } + + // Run for a short time to simulate work, then clean up + time.Sleep(2 * time.Second) + + if err := s.dockerMgr.StopContainer(containerID); err != nil { + s.log.Error("failed to stop container", "job_id", job.ID, "container_id", containerID, "error", err) + } + if err := s.dockerMgr.RemoveContainer(containerID); err != nil { + s.log.Error("failed to remove container", "job_id", job.ID, "container_id", containerID, "error", err) + } + if err := s.dockerMgr.RemoveVolume(volumeName, true); err != nil { + s.log.Warn("failed to remove volume", "job_id", job.ID, "volume", volumeName, "error", err) + } + + s.log.Info("job container completed", "job_id", job.ID, "container_id", containerID) return true } +// isCPUJob returns true if the job can run on CPU (no GPU required). +func (s *Supervisor) isCPUJob(job Job) bool { + switch job.RequiredGPU { + case "", "CPU": + return true + default: + return false + } +} + func (s *Supervisor) emitJobEvent(jobID string, state JobState) { event := map[string]interface{}{ @@ -213,6 +286,13 @@ func (s *Supervisor) emitJobEvent(jobID string, state JobState) { } +func (s *Supervisor) updateJobState(jobID string, state JobState) { + jobKey := fmt.Sprintf("job:%s", jobID) + if err := s.redisClient.HSet(s.ctx, jobKey, "job_state", string(state)).Err(); err != nil { + s.log.Error("failed to update job state", "job_id", jobID, "error", err) + } +} + func (s *Supervisor) ackMessage(messageID string) { result := s.redisClient.XAck(s.ctx, StreamName, ConsumerGroup, messageID) if result.Err() != nil {