diff --git a/caddy/mercure-skip.go b/caddy/mercure-skip.go index 3331a04d8f..4dde348799 100644 --- a/caddy/mercure-skip.go +++ b/caddy/mercure-skip.go @@ -1,4 +1,5 @@ //go:build nomercure + package caddy import ( diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index 7a146fe7e3..baf9ad86b2 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -39,7 +39,7 @@ type workerConfig struct { // MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick) MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"` - requestOptions []frankenphp.RequestOption + requestOptions []frankenphp.RequestOption } func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) { diff --git a/frankenphp.go b/frankenphp.go index 4b5c0972b4..d71f25a0b2 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -307,7 +307,7 @@ func Init(options ...Option) error { return err } - regularRequestChan = make(chan contextHolder, opt.numThreads-workerThreadCount) + regularRequestChan = make(chan contextHolder) regularThreads = make([]*phpThread, 0, opt.numThreads-workerThreadCount) for i := 0; i < opt.numThreads-workerThreadCount; i++ { convertToRegularThread(getInactivePHPThread()) diff --git a/threadregular.go b/threadregular.go index b01c060d6a..9c210cf502 100644 --- a/threadregular.go +++ b/threadregular.go @@ -2,7 +2,9 @@ package frankenphp import ( "context" + "runtime" "sync" + "sync/atomic" ) // representation of a non-worker PHP thread @@ -16,9 +18,10 @@ type regularThread struct { } var ( - regularThreads []*phpThread - regularThreadMu = &sync.RWMutex{} - regularRequestChan chan contextHolder + regularThreads []*phpThread + regularThreadMu = &sync.RWMutex{} + regularRequestChan chan contextHolder + queuedRegularThreads = atomic.Int32{} ) func convertToRegularThread(thread *phpThread) { @@ -81,6 +84,7 @@ func (handler *regularThread) waitForRequest() string { // go back to beforeScriptExecution return handler.beforeScriptExecution() case ch = <-regularRequestChan: + case ch = <-handler.thread.requestChan: } handler.ctx = ch.ctx @@ -100,23 +104,35 @@ func (handler *regularThread) afterRequest() { func handleRequestWithRegularPHPThreads(ch contextHolder) error { metrics.StartRequest() - select { - case regularRequestChan <- ch: - // a thread was available to handle the request immediately - <-ch.frankenPHPContext.done - metrics.StopRequest() - - return nil - default: - // no thread was available + runtime.Gosched() + + if queuedRegularThreads.Load() == 0 { + regularThreadMu.RLock() + for _, thread := range regularThreads { + select { + case thread.requestChan <- ch: + regularThreadMu.RUnlock() + <-ch.frankenPHPContext.done + metrics.StopRequest() + + return nil + default: + // thread was not available + } + } + regularThreadMu.RUnlock() } // if no thread was available, mark the request as queued and fan it out to all threads + queuedRegularThreads.Add(1) metrics.QueuedRequest() + for { select { case regularRequestChan <- ch: + queuedRegularThreads.Add(-1) metrics.DequeuedRequest() + <-ch.frankenPHPContext.done metrics.StopRequest() @@ -125,7 +141,9 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error { // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): // the request has timed out stalling + queuedRegularThreads.Add(-1) metrics.DequeuedRequest() + metrics.StopRequest() ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded) diff --git a/worker.go b/worker.go index e5e9b9da68..5b83f67582 100644 --- a/worker.go +++ b/worker.go @@ -6,8 +6,10 @@ import ( "fmt" "os" "path/filepath" + "runtime" "strings" "sync" + "sync/atomic" "time" "github.com/dunglas/frankenphp/internal/fastabs" @@ -28,6 +30,7 @@ type worker struct { maxConsecutiveFailures int onThreadReady func(int) onThreadShutdown func(int) + queuedRequests atomic.Int32 } var ( @@ -253,24 +256,30 @@ func (worker *worker) isAtThreadLimit() bool { func (worker *worker) handleRequest(ch contextHolder) error { metrics.StartWorkerRequest(worker.name) - // dispatch requests to all worker threads in order - worker.threadMutex.RLock() - for _, thread := range worker.threads { - select { - case thread.requestChan <- ch: - worker.threadMutex.RUnlock() - <-ch.frankenPHPContext.done - metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) + runtime.Gosched() - return nil - default: - // thread is busy, continue + if worker.queuedRequests.Load() == 0 { + // dispatch requests to all worker threads in order + worker.threadMutex.RLock() + for _, thread := range worker.threads { + select { + case thread.requestChan <- ch: + worker.threadMutex.RUnlock() + <-ch.frankenPHPContext.done + metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) + + return nil + default: + // thread is busy, continue + } } + worker.threadMutex.RUnlock() } - worker.threadMutex.RUnlock() // if no thread was available, mark the request as queued and apply the scaling strategy + worker.queuedRequests.Add(1) metrics.QueuedWorkerRequest(worker.name) + for { workerScaleChan := scaleChan if worker.isAtThreadLimit() { @@ -279,6 +288,7 @@ func (worker *worker) handleRequest(ch contextHolder) error { select { case worker.requestChan <- ch: + worker.queuedRequests.Add(-1) metrics.DequeuedWorkerRequest(worker.name) <-ch.frankenPHPContext.done metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) @@ -288,7 +298,9 @@ func (worker *worker) handleRequest(ch contextHolder) error { // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): // the request has timed out stalling + worker.queuedRequests.Add(-1) metrics.DequeuedWorkerRequest(worker.name) + metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded)