From 13c093ca1e57d8ca4e37b12d4091c8c595cc5133 Mon Sep 17 00:00:00 2001 From: Dorin Geman Date: Tue, 20 May 2025 16:00:27 +0300 Subject: [PATCH 1/3] Add /engines/ps Signed-off-by: Dorin Geman --- pkg/inference/scheduling/api.go | 13 +++++++++ pkg/inference/scheduling/scheduler.go | 42 +++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/pkg/inference/scheduling/api.go b/pkg/inference/scheduling/api.go index 492ed7d6a..ad44d03fd 100644 --- a/pkg/inference/scheduling/api.go +++ b/pkg/inference/scheduling/api.go @@ -2,6 +2,7 @@ package scheduling import ( "strings" + "time" "github.com/docker/model-runner/pkg/inference" ) @@ -42,3 +43,15 @@ type OpenAIInferenceRequest struct { // Model is the requested model name. Model string `json:"model"` } + +// BackendStatus represents information about a running backend +type BackendStatus struct { + // BackendName is the name of the backend + BackendName string `json:"backend_name"` + // ModelName is the name of the model loaded in the backend + ModelName string `json:"model_name"` + // Mode is the mode the backend is operating in + Mode string `json:"mode"` + // LastUsed represents when this (backend, model, mode) tuple was last used + LastUsed time.Time `json:"last_used,omitempty"` +} diff --git a/pkg/inference/scheduling/scheduler.go b/pkg/inference/scheduling/scheduler.go index f0ae1295a..5cebe3ce4 100644 --- a/pkg/inference/scheduling/scheduler.go +++ b/pkg/inference/scheduling/scheduler.go @@ -8,6 +8,7 @@ import ( "fmt" "io" "net/http" + "time" "github.com/docker/model-distribution/distribution" "github.com/docker/model-runner/pkg/inference" @@ -81,6 +82,7 @@ func (s *Scheduler) routeHandlers() map[string]http.HandlerFunc { m[route] = s.handleOpenAIInference } m["GET "+inference.InferencePrefix+"/status"] = s.GetBackendStatus + m["GET "+inference.InferencePrefix+"/ps"] = s.GetRunningBackends return m } @@ -224,6 +226,46 @@ func (s *Scheduler) ResetInstaller(httpClient *http.Client) { s.installer = newInstaller(s.log, s.backends, httpClient) } +// GetRunningBackends returns information about all running backends +func (s *Scheduler) GetRunningBackends(w http.ResponseWriter, r *http.Request) { + runningBackends := s.getLoaderStatus() + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(runningBackends); err != nil { + http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError) + return + } +} + +// getLoaderStatus returns information about all running backends managed by the loader +func (s *Scheduler) getLoaderStatus() []BackendStatus { + if !s.loader.lock(context.Background()) { + return []BackendStatus{} + } + defer s.loader.unlock() + + result := make([]BackendStatus, 0, len(s.loader.runners)) + + for key, slot := range s.loader.runners { + if s.loader.slots[slot] != nil { + status := BackendStatus{ + BackendName: key.backend, + ModelName: key.model, + Mode: key.mode.String(), + LastUsed: time.Time{}, + } + + if s.loader.references[slot] == 0 { + status.LastUsed = s.loader.timestamps[slot] + } + + result = append(result, status) + } + } + + return result +} + // ServeHTTP implements net/http.Handler.ServeHTTP. func (s *Scheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.router.ServeHTTP(w, r) From b881521c88ba382684ca46088fab99d6d166bdaf Mon Sep 17 00:00:00 2001 From: Dorin Geman Date: Wed, 21 May 2025 09:59:37 +0300 Subject: [PATCH 2/3] Add /engines/df Signed-off-by: Dorin Geman --- main.go | 7 +++++- pkg/diskusage/diskusage.go | 24 +++++++++++++++++++++ pkg/inference/backend.go | 2 ++ pkg/inference/backends/llamacpp/llamacpp.go | 9 ++++++++ pkg/inference/backends/mlx/mlx.go | 4 ++++ pkg/inference/backends/vllm/vllm.go | 4 ++++ pkg/inference/models/manager.go | 16 ++++++++++++++ pkg/inference/scheduling/api.go | 6 ++++++ pkg/inference/scheduling/scheduler.go | 23 ++++++++++++++++++++ 9 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 pkg/diskusage/diskusage.go diff --git a/main.go b/main.go index 06f911ca0..8fc168cee 100644 --- a/main.go +++ b/main.go @@ -55,7 +55,12 @@ func main() { modelManager, log.WithFields(logrus.Fields{"component": "llama.cpp"}), llamaServerPath, - func() string { wd, _ := os.Getwd(); return wd }(), + func() string { + wd, _ := os.Getwd() + d := filepath.Join(wd, "updated-inference") + _ = os.MkdirAll(d, 0o755) + return d + }(), ) if err != nil { log.Fatalf("unable to initialize %s backend: %v", llamacpp.Name, err) diff --git a/pkg/diskusage/diskusage.go b/pkg/diskusage/diskusage.go new file mode 100644 index 000000000..9a1a2c292 --- /dev/null +++ b/pkg/diskusage/diskusage.go @@ -0,0 +1,24 @@ +package diskusage + +import ( + "io/fs" + "path/filepath" +) + +func Size(path string) (float64, error) { + var size int64 + err := filepath.WalkDir(path, func(_ string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.Type().IsRegular() { + info, err := d.Info() + if err != nil { + return err + } + size += info.Size() + } + return nil + }) + return float64(size), err +} diff --git a/pkg/inference/backend.go b/pkg/inference/backend.go index b5eff2560..f1b5d10a9 100644 --- a/pkg/inference/backend.go +++ b/pkg/inference/backend.go @@ -69,4 +69,6 @@ type Backend interface { Run(ctx context.Context, socket, model string, mode BackendMode) error // Status returns a description of the backend's state. Status() string + // GetDiskUsage returns the disk usage of the backend. + GetDiskUsage() (float64, error) } diff --git a/pkg/inference/backends/llamacpp/llamacpp.go b/pkg/inference/backends/llamacpp/llamacpp.go index 451856b12..878805059 100644 --- a/pkg/inference/backends/llamacpp/llamacpp.go +++ b/pkg/inference/backends/llamacpp/llamacpp.go @@ -12,6 +12,7 @@ import ( "runtime" "strconv" + "github.com/docker/model-runner/pkg/diskusage" "github.com/docker/model-runner/pkg/inference" "github.com/docker/model-runner/pkg/inference/models" "github.com/docker/model-runner/pkg/logging" @@ -199,3 +200,11 @@ func (l *llamaCpp) Run(ctx context.Context, socket, model string, mode inference func (l *llamaCpp) Status() string { return l.status } + +func (l *llamaCpp) GetDiskUsage() (float64, error) { + size, err := diskusage.Size(l.updatedServerStoragePath) + if err != nil { + return 0, fmt.Errorf("error while getting store size: %v", err) + } + return size, nil +} diff --git a/pkg/inference/backends/mlx/mlx.go b/pkg/inference/backends/mlx/mlx.go index 7f3025131..226a176bf 100644 --- a/pkg/inference/backends/mlx/mlx.go +++ b/pkg/inference/backends/mlx/mlx.go @@ -58,3 +58,7 @@ func (m *mlx) Run(ctx context.Context, socket, model string, mode inference.Back func (m *mlx) Status() string { return "not running" } + +func (m *mlx) GetDiskUsage() (float64, error) { + return 0, nil +} diff --git a/pkg/inference/backends/vllm/vllm.go b/pkg/inference/backends/vllm/vllm.go index ccf86df28..c5f1f5a7a 100644 --- a/pkg/inference/backends/vllm/vllm.go +++ b/pkg/inference/backends/vllm/vllm.go @@ -58,3 +58,7 @@ func (v *vLLM) Run(ctx context.Context, socket, model string, mode inference.Bac func (v *vLLM) Status() string { return "not running" } + +func (v *vLLM) GetDiskUsage() (float64, error) { + return 0, nil +} diff --git a/pkg/inference/models/manager.go b/pkg/inference/models/manager.go index 3117dea76..64e42b2ac 100644 --- a/pkg/inference/models/manager.go +++ b/pkg/inference/models/manager.go @@ -14,6 +14,7 @@ import ( "github.com/docker/model-distribution/distribution" "github.com/docker/model-distribution/registry" "github.com/docker/model-distribution/types" + "github.com/docker/model-runner/pkg/diskusage" "github.com/docker/model-runner/pkg/inference" "github.com/docker/model-runner/pkg/logging" "github.com/sirupsen/logrus" @@ -399,6 +400,21 @@ func (m *Manager) handlePushModel(w http.ResponseWriter, r *http.Request, model } } +// GetDiskUsage returns the disk usage of the model store. +func (m *Manager) GetDiskUsage() (float64, error, int) { + if m.distributionClient == nil { + return 0, errors.New("model distribution service unavailable"), http.StatusServiceUnavailable + } + + storePath := m.distributionClient.GetStorePath() + size, err := diskusage.Size(storePath) + if err != nil { + return 0, fmt.Errorf("error while getting store size: %v", err), http.StatusInternalServerError + } + + return size, nil, http.StatusOK +} + // ServeHTTP implement net/http.Handler.ServeHTTP. func (m *Manager) ServeHTTP(w http.ResponseWriter, r *http.Request) { m.router.ServeHTTP(w, r) diff --git a/pkg/inference/scheduling/api.go b/pkg/inference/scheduling/api.go index ad44d03fd..67bed38c0 100644 --- a/pkg/inference/scheduling/api.go +++ b/pkg/inference/scheduling/api.go @@ -55,3 +55,9 @@ type BackendStatus struct { // LastUsed represents when this (backend, model, mode) tuple was last used LastUsed time.Time `json:"last_used,omitempty"` } + +// DiskUsage represents the disk usage of the models and default backend. +type DiskUsage struct { + ModelsDiskUsage float64 `json:"models_disk_usage"` + DefaultBackendDiskUsage float64 `json:"default_backend_disk_usage"` +} diff --git a/pkg/inference/scheduling/scheduler.go b/pkg/inference/scheduling/scheduler.go index 5cebe3ce4..16764adfd 100644 --- a/pkg/inference/scheduling/scheduler.go +++ b/pkg/inference/scheduling/scheduler.go @@ -83,6 +83,7 @@ func (s *Scheduler) routeHandlers() map[string]http.HandlerFunc { } m["GET "+inference.InferencePrefix+"/status"] = s.GetBackendStatus m["GET "+inference.InferencePrefix+"/ps"] = s.GetRunningBackends + m["GET "+inference.InferencePrefix+"/df"] = s.GetDiskUsage return m } @@ -266,6 +267,28 @@ func (s *Scheduler) getLoaderStatus() []BackendStatus { return result } +func (s *Scheduler) GetDiskUsage(w http.ResponseWriter, _ *http.Request) { + modelsDiskUsage, err, httpCode := s.modelManager.GetDiskUsage() + if err != nil { + http.Error(w, fmt.Sprintf("Failed to get models disk usage: %v", err), httpCode) + return + } + + // TODO: Get disk usage for each backend once the backends are implemented. + defaultBackendDiskUsage, err := s.defaultBackend.GetDiskUsage() + if err != nil { + http.Error(w, fmt.Sprintf("Failed to get disk usage for %s: %v", s.defaultBackend.Name(), err), http.StatusInternalServerError) + return + } + + diskUsage := DiskUsage{modelsDiskUsage, defaultBackendDiskUsage} + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(diskUsage); err != nil { + http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError) + return + } +} + // ServeHTTP implements net/http.Handler.ServeHTTP. func (s *Scheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.router.ServeHTTP(w, r) From 47a0fae220d509ccdf95b82f673b7903184d3adb Mon Sep 17 00:00:00 2001 From: Dorin Geman Date: Wed, 21 May 2025 14:14:04 +0300 Subject: [PATCH 3/3] Add /engines/unload Signed-off-by: Dorin Geman --- pkg/inference/scheduling/api.go | 12 +++++++++ pkg/inference/scheduling/loader.go | 36 +++++++++++++++++++++++++++ pkg/inference/scheduling/scheduler.go | 28 +++++++++++++++++++++ 3 files changed, 76 insertions(+) diff --git a/pkg/inference/scheduling/api.go b/pkg/inference/scheduling/api.go index 67bed38c0..e3f17cf3a 100644 --- a/pkg/inference/scheduling/api.go +++ b/pkg/inference/scheduling/api.go @@ -61,3 +61,15 @@ type DiskUsage struct { ModelsDiskUsage float64 `json:"models_disk_usage"` DefaultBackendDiskUsage float64 `json:"default_backend_disk_usage"` } + +// UnloadRequest is used to specify which models to unload. +type UnloadRequest struct { + All bool `json:"all"` + Backend string `json:"backend"` + Model string `json:"model"` +} + +// UnloadResponse is used to return the number of unloaded runners (backend, model). +type UnloadResponse struct { + UnloadedRunners int `json:"unloaded_runners"` +} diff --git a/pkg/inference/scheduling/loader.go b/pkg/inference/scheduling/loader.go index ea8e15d34..c05cbf050 100644 --- a/pkg/inference/scheduling/loader.go +++ b/pkg/inference/scheduling/loader.go @@ -177,6 +177,42 @@ func (l *loader) evict(idleOnly bool) int { return len(l.runners) } +// evictRunner evicts a specific runner. The caller must hold the loader lock. +// It returns the number of remaining runners. +func (l *loader) evictRunner(backend, model string) int { + allBackends := backend == "" + for r, slot := range l.runners { + if (allBackends || r.backend == backend) && r.model == model { + l.log.Infof("Evicting %s backend runner with model %s in %s mode", + r.backend, r.model, r.mode, + ) + l.slots[slot].terminate() + l.slots[slot] = nil + l.availableMemory += l.allocations[slot] + l.allocations[slot] = 0 + l.timestamps[slot] = time.Time{} + delete(l.runners, r) + } + } + return len(l.runners) +} + +// Unload unloads runners and returns the number of unloaded runners. +func (l *loader) Unload(ctx context.Context, unload UnloadRequest) int { + if !l.lock(ctx) { + return 0 + } + defer l.unlock() + + return len(l.runners) - func() int { + if unload.All { + return l.evict(false) + } else { + return l.evictRunner(unload.Backend, unload.Model) + } + }() +} + // stopAndDrainTimer stops and drains a timer without knowing if it was running. func stopAndDrainTimer(timer *time.Timer) { timer.Stop() diff --git a/pkg/inference/scheduling/scheduler.go b/pkg/inference/scheduling/scheduler.go index 16764adfd..cecc3d65a 100644 --- a/pkg/inference/scheduling/scheduler.go +++ b/pkg/inference/scheduling/scheduler.go @@ -84,6 +84,7 @@ func (s *Scheduler) routeHandlers() map[string]http.HandlerFunc { m["GET "+inference.InferencePrefix+"/status"] = s.GetBackendStatus m["GET "+inference.InferencePrefix+"/ps"] = s.GetRunningBackends m["GET "+inference.InferencePrefix+"/df"] = s.GetDiskUsage + m["POST "+inference.InferencePrefix+"/unload"] = s.Unload return m } @@ -289,6 +290,33 @@ func (s *Scheduler) GetDiskUsage(w http.ResponseWriter, _ *http.Request) { } } +// Unload unloads the specified runners (backend, model) from the backend. +// Currently, this doesn't work for runners that are handling an OpenAI request. +func (s *Scheduler) Unload(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(http.MaxBytesReader(w, r.Body, maximumOpenAIInferenceRequestSize)) + if err != nil { + if _, ok := err.(*http.MaxBytesError); ok { + http.Error(w, "request too large", http.StatusBadRequest) + } else { + http.Error(w, "unknown error", http.StatusInternalServerError) + } + return + } + + var unloadRequest UnloadRequest + if err := json.Unmarshal(body, &unloadRequest); err != nil { + http.Error(w, "invalid request", http.StatusBadRequest) + return + } + + unloadedRunners := UnloadResponse{s.loader.Unload(r.Context(), unloadRequest)} + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(unloadedRunners); err != nil { + http.Error(w, fmt.Sprintf("Failed to encode response: %v", err), http.StatusInternalServerError) + return + } +} + // ServeHTTP implements net/http.Handler.ServeHTTP. func (s *Scheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.router.ServeHTTP(w, r)