From 093201f7c9ec5ddc594af48d939a3e26c446ab28 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Mon, 24 Nov 2025 22:27:04 +0100 Subject: [PATCH 1/6] suggestion rt. --- frankenphp.go | 2 +- threadregular.go | 39 +++++++++++++++++++++++++++------------ 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/frankenphp.go b/frankenphp.go index 4b5c0972b4..d71f25a0b2 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -307,7 +307,7 @@ func Init(options ...Option) error { return err } - regularRequestChan = make(chan contextHolder, opt.numThreads-workerThreadCount) + regularRequestChan = make(chan contextHolder) regularThreads = make([]*phpThread, 0, opt.numThreads-workerThreadCount) for i := 0; i < opt.numThreads-workerThreadCount; i++ { convertToRegularThread(getInactivePHPThread()) diff --git a/threadregular.go b/threadregular.go index b01c060d6a..265c236664 100644 --- a/threadregular.go +++ b/threadregular.go @@ -2,7 +2,9 @@ package frankenphp import ( "context" + "runtime" "sync" + "sync/atomic" ) // representation of a non-worker PHP thread @@ -16,9 +18,10 @@ type regularThread struct { } var ( - regularThreads []*phpThread - regularThreadMu = &sync.RWMutex{} - regularRequestChan chan contextHolder + regularThreads []*phpThread + regularThreadMu = &sync.RWMutex{} + regularRequestChan chan contextHolder + queuedRegularThreads = atomic.Int32{} ) func convertToRegularThread(thread *phpThread) { @@ -81,6 +84,7 @@ func (handler *regularThread) waitForRequest() string { // go back to beforeScriptExecution return handler.beforeScriptExecution() case ch = <-regularRequestChan: + case ch = <-handler.thread.requestChan: } handler.ctx = ch.ctx @@ -100,23 +104,33 @@ func (handler *regularThread) afterRequest() { func handleRequestWithRegularPHPThreads(ch contextHolder) error { metrics.StartRequest() - select { - case regularRequestChan <- ch: - // a thread was available to handle the request immediately - <-ch.frankenPHPContext.done - metrics.StopRequest() - - return nil - default: - // no thread was available + if queuedRegularThreads.Load() == 0 { + regularThreadMu.RLock() + for _, thread := range regularThreads { + select { + case thread.requestChan <- ch: + regularThreadMu.RUnlock() + <-ch.frankenPHPContext.done + return nil + default: + // thread was not available + } + } + regularThreadMu.RUnlock() } // if no thread was available, mark the request as queued and fan it out to all threads + queuedRegularThreads.Add(1) metrics.QueuedRequest() + + runtime.Gosched() + for { select { case regularRequestChan <- ch: + queuedRegularThreads.Add(-1) metrics.DequeuedRequest() + <-ch.frankenPHPContext.done metrics.StopRequest() @@ -125,6 +139,7 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error { // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): // the request has timed out stalling + queuedRegularThreads.Add(-1) metrics.DequeuedRequest() ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded) From 4b87dda96d5ef9769a64dc13f336bce110d7a497 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Mon, 24 Nov 2025 22:46:07 +0100 Subject: [PATCH 2/6] suggestion wt. --- worker.go | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/worker.go b/worker.go index e5e9b9da68..4dd2a63b50 100644 --- a/worker.go +++ b/worker.go @@ -6,8 +6,10 @@ import ( "fmt" "os" "path/filepath" + "runtime" "strings" "sync" + "sync/atomic" "time" "github.com/dunglas/frankenphp/internal/fastabs" @@ -28,6 +30,7 @@ type worker struct { maxConsecutiveFailures int onThreadReady func(int) onThreadShutdown func(int) + queuedRequests atomic.Int32 } var ( @@ -253,24 +256,30 @@ func (worker *worker) isAtThreadLimit() bool { func (worker *worker) handleRequest(ch contextHolder) error { metrics.StartWorkerRequest(worker.name) - // dispatch requests to all worker threads in order - worker.threadMutex.RLock() - for _, thread := range worker.threads { - select { - case thread.requestChan <- ch: - worker.threadMutex.RUnlock() - <-ch.frankenPHPContext.done - metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) - - return nil - default: - // thread is busy, continue + if worker.queuedRequests.Load() == 0 { + // dispatch requests to all worker threads in order + worker.threadMutex.RLock() + for _, thread := range worker.threads { + select { + case thread.requestChan <- ch: + worker.threadMutex.RUnlock() + <-ch.frankenPHPContext.done + metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) + + return nil + default: + // thread is busy, continue + } } + worker.threadMutex.RUnlock() } - worker.threadMutex.RUnlock() // if no thread was available, mark the request as queued and apply the scaling strategy + worker.queuedRequests.Add(1) metrics.QueuedWorkerRequest(worker.name) + + runtime.Gosched() + for { workerScaleChan := scaleChan if worker.isAtThreadLimit() { @@ -279,6 +288,7 @@ func (worker *worker) handleRequest(ch contextHolder) error { select { case worker.requestChan <- ch: + worker.queuedRequests.Add(-1) metrics.DequeuedWorkerRequest(worker.name) <-ch.frankenPHPContext.done metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) @@ -288,6 +298,7 @@ func (worker *worker) handleRequest(ch contextHolder) error { // the request has triggered scaling, continue to wait for a thread case <-timeoutChan(maxWaitTime): // the request has timed out stalling + worker.queuedRequests.Add(-1) metrics.DequeuedWorkerRequest(worker.name) ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded) From 86e50bed5ebd4467255c86ed54d0892ef269fbde Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Mon, 24 Nov 2025 22:53:48 +0100 Subject: [PATCH 3/6] test --- worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker.go b/worker.go index 4dd2a63b50..7d1d66f2f8 100644 --- a/worker.go +++ b/worker.go @@ -278,14 +278,14 @@ func (worker *worker) handleRequest(ch contextHolder) error { worker.queuedRequests.Add(1) metrics.QueuedWorkerRequest(worker.name) - runtime.Gosched() - for { workerScaleChan := scaleChan if worker.isAtThreadLimit() { workerScaleChan = nil // max_threads for this worker reached, do not attempt scaling } + runtime.Gosched() + select { case worker.requestChan <- ch: worker.queuedRequests.Add(-1) From 2c262a143e2f0cd3e8eb7be0bcec934876391f53 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Mon, 24 Nov 2025 23:08:40 +0100 Subject: [PATCH 4/6] test --- threadregular.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/threadregular.go b/threadregular.go index 265c236664..feda7e2e6d 100644 --- a/threadregular.go +++ b/threadregular.go @@ -123,9 +123,8 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error { queuedRegularThreads.Add(1) metrics.QueuedRequest() - runtime.Gosched() - for { + runtime.Gosched() select { case regularRequestChan <- ch: queuedRegularThreads.Add(-1) From 67eef2a943a19b22dfe564540bc9dc0ea3bf14a8 Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Mon, 24 Nov 2025 23:46:40 +0100 Subject: [PATCH 5/6] test --- threadregular.go | 3 ++- worker.go | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/threadregular.go b/threadregular.go index feda7e2e6d..cc0ac07667 100644 --- a/threadregular.go +++ b/threadregular.go @@ -104,6 +104,8 @@ func (handler *regularThread) afterRequest() { func handleRequestWithRegularPHPThreads(ch contextHolder) error { metrics.StartRequest() + runtime.Gosched() + if queuedRegularThreads.Load() == 0 { regularThreadMu.RLock() for _, thread := range regularThreads { @@ -124,7 +126,6 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error { metrics.QueuedRequest() for { - runtime.Gosched() select { case regularRequestChan <- ch: queuedRegularThreads.Add(-1) diff --git a/worker.go b/worker.go index 7d1d66f2f8..4e4938302c 100644 --- a/worker.go +++ b/worker.go @@ -256,6 +256,8 @@ func (worker *worker) isAtThreadLimit() bool { func (worker *worker) handleRequest(ch contextHolder) error { metrics.StartWorkerRequest(worker.name) + runtime.Gosched() + if worker.queuedRequests.Load() == 0 { // dispatch requests to all worker threads in order worker.threadMutex.RLock() @@ -284,8 +286,6 @@ func (worker *worker) handleRequest(ch contextHolder) error { workerScaleChan = nil // max_threads for this worker reached, do not attempt scaling } - runtime.Gosched() - select { case worker.requestChan <- ch: worker.queuedRequests.Add(-1) From 941697f357cd69ef395090532ca3461186e19d0f Mon Sep 17 00:00:00 2001 From: Alliballibaba Date: Tue, 25 Nov 2025 15:38:43 +0100 Subject: [PATCH 6/6] Foxes metrics. --- caddy/mercure-skip.go | 1 + caddy/workerconfig.go | 2 +- threadregular.go | 3 +++ worker.go | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/caddy/mercure-skip.go b/caddy/mercure-skip.go index 3331a04d8f..4dde348799 100644 --- a/caddy/mercure-skip.go +++ b/caddy/mercure-skip.go @@ -1,4 +1,5 @@ //go:build nomercure + package caddy import ( diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index 7a146fe7e3..baf9ad86b2 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -39,7 +39,7 @@ type workerConfig struct { // MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick) MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"` - requestOptions []frankenphp.RequestOption + requestOptions []frankenphp.RequestOption } func parseWorkerConfig(d *caddyfile.Dispenser) (workerConfig, error) { diff --git a/threadregular.go b/threadregular.go index cc0ac07667..9c210cf502 100644 --- a/threadregular.go +++ b/threadregular.go @@ -113,6 +113,8 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error { case thread.requestChan <- ch: regularThreadMu.RUnlock() <-ch.frankenPHPContext.done + metrics.StopRequest() + return nil default: // thread was not available @@ -141,6 +143,7 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error { // the request has timed out stalling queuedRegularThreads.Add(-1) metrics.DequeuedRequest() + metrics.StopRequest() ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded) diff --git a/worker.go b/worker.go index 4e4938302c..5b83f67582 100644 --- a/worker.go +++ b/worker.go @@ -300,6 +300,7 @@ func (worker *worker) handleRequest(ch contextHolder) error { // the request has timed out stalling worker.queuedRequests.Add(-1) metrics.DequeuedWorkerRequest(worker.name) + metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt)) ch.frankenPHPContext.reject(ErrMaxWaitTimeExceeded)