Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/handler/endpoint_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
"github.com/gin-gonic/gin"
)

// EndpointHandler handles endpoint lifecycle APIs (metadata + K8s deployment)
// EndpointHandler handles endpoint lifecycle APIs (metadata + deployment)
// Supports multiple deployment providers: K8s, Novita, etc.
type EndpointHandler struct {
deploymentProvider interfaces.DeploymentProvider
endpointService *endpointsvc.Service
Expand Down
2 changes: 1 addition & 1 deletion app/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *Router) Setup(engine *gin.Engine) {
v2.POST("/job-stream/:worker_id/:task_id", r.workerHandler.SubmitResult)
}

// API v1 - K8s application management interface (if enabled)
// API v1 - Endpoint management interface (K8s or Novita, if enabled)
if r.endpointHandler != nil {
api := engine.Group("/api/v1")
{
Expand Down
70 changes: 67 additions & 3 deletions cmd/initializers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"waverless/pkg/autoscaler"
"waverless/pkg/config"
"waverless/pkg/deploy/k8s"
"waverless/pkg/deploy/novita"
"waverless/pkg/interfaces"
"waverless/pkg/logger"
"waverless/pkg/monitoring"
Expand Down Expand Up @@ -120,6 +121,14 @@ func (app *Application) initServices() error {
}
}

// Get Novita deployment provider for status sync
var novitaDeployProvider *novita.NovitaDeploymentProvider
if app.config.Novita.Enabled {
if novitaProv, ok := app.deploymentProvider.(*novita.NovitaDeploymentProvider); ok {
novitaDeployProvider = novitaProv
}
}

// Initialize worker service (MySQL-based)
app.workerService = service.NewWorkerService(
app.mysqlRepo.Worker,
Expand Down Expand Up @@ -198,6 +207,12 @@ func (app *Application) initServices() error {
// Non-critical feature, continue startup
}

// Setup Novita status watcher for endpoint status sync (when Novita is enabled)
if err := app.setupNovitaStatusWatcher(novitaDeployProvider); err != nil {
logger.WarnCtx(app.ctx, "Failed to setup Novita status watcher: %v (non-critical, continuing)", err)
// Non-critical feature, continue startup
}

return nil
}

Expand Down Expand Up @@ -307,6 +322,49 @@ func (app *Application) setupSpotInterruptionWatcher(k8sProvider *k8s.K8sDeploym
return nil
}

// setupNovitaStatusWatcher sets up Novita status watcher for endpoint status sync
func (app *Application) setupNovitaStatusWatcher(novitaProvider *novita.NovitaDeploymentProvider) error {
if novitaProvider == nil {
logger.InfoCtx(app.ctx, "Novita provider not available, skipping status watcher setup")
return nil
}

logger.InfoCtx(app.ctx, "Setting up Novita status watcher for endpoint status sync...")

// Register replica watch callback to sync status to database
err := novitaProvider.WatchReplicas(app.ctx, func(event interfaces.ReplicaEvent) {
endpoint := event.Name

// Calculate status based on replica state
status := "Pending"
if event.AvailableReplicas == event.DesiredReplicas && event.DesiredReplicas > 0 {
status = "Running"
} else if event.DesiredReplicas == 0 {
status = "Stopped"
}

// Update endpoint runtime state in database
if app.mysqlRepo != nil && app.mysqlRepo.Endpoint != nil {
runtimeState := map[string]interface{}{
"replicas": event.DesiredReplicas,
"readyReplicas": event.ReadyReplicas,
"availableReplicas": event.AvailableReplicas,
}

if err := app.mysqlRepo.Endpoint.UpdateRuntimeState(app.ctx, endpoint, status, runtimeState); err != nil {
logger.ErrorCtx(app.ctx, "Failed to update Novita endpoint runtime state: %v", err)
}
}
})

if err != nil {
logger.WarnCtx(app.ctx, "Failed to register Novita status watcher: %v", err)
return err
}

return nil
}

// setupDeploymentWatcher sets up Deployment change listener (optimizes rolling updates)
// This watcher only sets Pod Deletion Cost to guide K8s on which pods to delete first
// It does NOT mark workers as DRAINING - that's handled by setupPodWatcher when pods are actually terminated
Expand Down Expand Up @@ -429,12 +487,18 @@ func (app *Application) initHandlers() error {
app.statisticsHandler = handler.NewStatisticsHandler(app.statisticsService, app.workerService)
app.monitoringHandler = handler.NewMonitoringHandler(app.monitoringService)

// Initialize K8s Handler (Endpoint Handler)
if app.config.K8s.Enabled {
// Initialize Endpoint Handler (for K8s or Novita)
if app.config.K8s.Enabled || app.config.Novita.Enabled {
if app.deploymentProvider == nil {
logger.ErrorCtx(app.ctx, "K8s is enabled but deployment provider is nil")
logger.ErrorCtx(app.ctx, "Deployment provider is enabled but provider is nil")
} else {
app.endpointHandler = handler.NewEndpointHandler(app.deploymentProvider, app.endpointService, app.workerService)
if app.config.K8s.Enabled {
logger.InfoCtx(app.ctx, "Endpoint handler initialized for K8s")
}
if app.config.Novita.Enabled {
logger.InfoCtx(app.ctx, "Endpoint handler initialized for Novita")
}
}
}

Expand Down
10 changes: 9 additions & 1 deletion config/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ notification:
feishu_webhook_url: "" # Example: "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxxxxx"

providers:
deployment: "k8s" # k8s, docker
deployment: "k8s" # k8s, docker, novita
queue: "redis" # redis, mysql
metadata: "mysql" # redis, mysql

# Novita Serverless Configuration
novita:
enabled: false # Enable Novita serverless provider
api_key: "" # Your Novita API key (Bearer token)
base_url: "https://api.novita.ai" # Novita API base URL
config_dir: "./config" # Configuration directory (contains specs.yaml and templates/)
poll_interval: 10 # Poll interval for status updates (seconds, default: 10)
8 changes: 8 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,11 @@ providers:
metadata: mysql # Metadata storage: mysql (persistent), redis (ephemeral)
# MySQL stores: endpoints, tasks, autoscaler configs, scaling events
# Redis stores: worker heartbeats, task queues, distributed locks, cache

# Novita Serverless Configuration
novita:
enabled: false # Enable Novita serverless provider
api_key: "" # Your Novita API key (Bearer token)
base_url: "https://api.novita.ai" # Novita API base URL
config_dir: "./config" # Configuration directory (contains specs.yaml and templates/)
poll_interval: 10 # Poll interval for status updates (seconds, default: 10)
32 changes: 32 additions & 0 deletions config/specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,35 @@ specs:
operator: "Equal"
value: "gpu"
effect: "NoSchedule"

# Novita 5090 Single GPU
- name: "novita-5090-single"
displayName: "Novita 5090 1x GPU"
category: "gpu"
resourceType: "serverless"
resources:
gpu: "1"
gpuType: "NVIDIA GeForce RTX 5090"
cpu: "12"
memory: "50"
ephemeralStorage: "100"
platforms:
novita:
productId: "SL-serverless-3" # Replace with actual Novita product ID
region: "us-dallas-nas-2"

# Novita 4090 Single GPU
- name: "novita-4090-single"
displayName: "Novita 4090 1x GPU"
category: "gpu"
resourceType: "serverless"
resources:
gpu: "1"
gpuType: "NVIDIA GeForce RTX 4090"
cpu: "8"
memory: "32"
ephemeralStorage: "50"
platforms:
novita:
productId: "1" # Replace with actual Novita product ID
region: "us-dallas-nas-2"
32 changes: 21 additions & 11 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ type Config struct {
Logger LoggerConfig `yaml:"logger"`
K8s K8sConfig `yaml:"k8s"`
AutoScaler AutoScalerConfig `yaml:"autoscaler"`
Docker DockerConfig `yaml:"docker"` // Docker registry authentication
Notification NotificationConfig `yaml:"notification"` // Notification configuration
Providers *ProvidersConfig `yaml:"providers,omitempty"` // Providers configuration (optional)
Docker DockerConfig `yaml:"docker"` // Docker registry authentication
Notification NotificationConfig `yaml:"notification"` // Notification configuration
Providers *ProvidersConfig `yaml:"providers,omitempty"` // Providers configuration (optional)
Novita NovitaConfig `yaml:"novita"` // Novita serverless configuration
}

// ServerConfig server configuration
Expand Down Expand Up @@ -80,18 +81,18 @@ type LoggerConfig struct {
// LoggerFileConfig logger file configuration
type LoggerFileConfig struct {
Path string `yaml:"path"`
MaxSize int `yaml:"max_size"` // MB per file (default: 100)
MaxBackups int `yaml:"max_backups"` // max backup files (default: 3)
MaxAge int `yaml:"max_age"` // days to keep (default: 7)
Compress bool `yaml:"compress"` // compress rotated files (default: false)
MaxSize int `yaml:"max_size"` // MB per file (default: 100)
MaxBackups int `yaml:"max_backups"` // max backup files (default: 3)
MaxAge int `yaml:"max_age"` // days to keep (default: 7)
Compress bool `yaml:"compress"` // compress rotated files (default: false)
}

// K8sConfig K8s configuration
type K8sConfig struct {
Enabled bool `yaml:"enabled"` // whether to enable K8s features
Namespace string `yaml:"namespace"` // K8s namespace
Platform string `yaml:"platform"` // Platform type: generic, aliyun-ack, aws-eks
ConfigDir string `yaml:"config_dir"` // Configuration directory (specs.yaml and templates)
Enabled bool `yaml:"enabled"` // whether to enable K8s features
Namespace string `yaml:"namespace"` // K8s namespace
Platform string `yaml:"platform"` // Platform type: generic, aliyun-ack, aws-eks
ConfigDir string `yaml:"config_dir"` // Configuration directory (specs.yaml and templates)
}

// ProvidersConfig providers configuration
Expand Down Expand Up @@ -129,6 +130,15 @@ type NotificationConfig struct {
FeishuWebhookURL string `yaml:"feishu_webhook_url"` // Feishu (Lark) webhook URL
}

// NovitaConfig Novita serverless configuration
type NovitaConfig struct {
Enabled bool `yaml:"enabled"` // Whether to enable Novita provider
APIKey string `yaml:"api_key"` // Novita API key (Bearer token)
BaseURL string `yaml:"base_url"` // API base URL, default: https://api.novita.ai
ConfigDir string `yaml:"config_dir"` // Configuration directory (specs.yaml and templates)
PollInterval int `yaml:"poll_interval"` // Poll interval for status updates (seconds, default: 10)
}

// Init initializes configuration
func Init() error {
configPath := os.Getenv("CONFIG_PATH")
Expand Down
Loading