Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions threadregular.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@ import (
type regularThread struct {
contextHolder

state *threadState
thread *phpThread
state *threadState
thread *phpThread
workReady chan contextHolder // Channel to receive work directly
}

var (
regularThreads []*phpThread
regularThreadMu = &sync.RWMutex{}
regularRequestChan chan contextHolder
regularSemaphore *semaphore.Weighted // FIFO admission control
regularThreadPool sync.Pool // Pool of idle threads for direct handoff
)

func convertToRegularThread(thread *phpThread) {
thread.setHandler(&regularThread{
thread: thread,
state: thread.state,
thread: thread,
state: thread.state,
workReady: make(chan contextHolder, 1), // Buffered to avoid blocking sender
})
attachRegularThread(thread)
}
Expand Down Expand Up @@ -77,13 +80,19 @@ func (handler *regularThread) waitForRequest() string {

handler.state.markAsWaiting(true)

var ch contextHolder
// Put this thread in the pool for direct handoff
regularThreadPool.Put(handler)

// Wait for work to be assigned (either via pool or fallback channel)
var ch contextHolder
select {
case <-handler.thread.drainChan:
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
case ch = <-handler.workReady:
// Work received via direct handoff from the pool
case ch = <-regularRequestChan:
// Fallback: work came via global channel
}

handler.ctx = ch.ctx
Expand Down Expand Up @@ -111,6 +120,18 @@ func handleRequestWithRegularPHPThreads(ch contextHolder) error {
}
defer regularSemaphore.Release(1)

// Fast path: try to get an idle thread from the pool
if idle := regularThreadPool.Get(); idle != nil {
handler := idle.(*regularThread)
// Send work to the thread's dedicated channel
handler.workReady <- ch
metrics.DequeuedRequest()
<-ch.frankenPHPContext.done
metrics.StopRequest()
return nil
}

// Slow path: no idle thread in pool, use the global channel
regularRequestChan <- ch
metrics.DequeuedRequest()
<-ch.frankenPHPContext.done
Expand Down
11 changes: 10 additions & 1 deletion threadworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type workerThread struct {
workerFrankenPHPContext *frankenPHPContext
workerContext context.Context
backoff *exponentialBackoff
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet
workReady chan contextHolder // Channel to receive work directly from pool
}

func convertToWorkerThread(thread *phpThread, worker *worker) {
Expand All @@ -35,6 +36,7 @@ func convertToWorkerThread(thread *phpThread, worker *worker) {
minBackoff: 100 * time.Millisecond,
maxConsecutiveFailures: worker.maxConsecutiveFailures,
},
workReady: make(chan contextHolder, 1), // Buffered to avoid blocking sender
})
worker.attachThread(thread)
}
Expand Down Expand Up @@ -210,6 +212,9 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {

handler.state.markAsWaiting(true)

// Put this thread in the pool for direct handoff
handler.worker.threadPool.Put(handler)

var requestCH contextHolder
select {
case <-handler.thread.drainChan:
Expand All @@ -225,7 +230,11 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) {

return false, nil
case requestCH = <-handler.thread.requestChan:
// Fast path: direct thread affinity
case requestCH = <-handler.workReady:
// Medium path: pool handoff
case requestCH = <-handler.worker.requestChan:
// Slow path: global channel
}

handler.workerContext = requestCH.ctx
Expand Down
31 changes: 14 additions & 17 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type worker struct {
semaphore *semaphore.Weighted
threads []*phpThread
threadMutex sync.RWMutex
threadPool sync.Pool // Pool of idle worker threads for direct handoff
allowPathMatching bool
maxConsecutiveFailures int
onThreadReady func(int)
Expand Down Expand Up @@ -249,23 +250,7 @@ func (worker *worker) isAtThreadLimit() bool {
func (worker *worker) handleRequest(ch contextHolder) error {
metrics.StartWorkerRequest(worker.name)

// dispatch requests to all worker threads in order
worker.threadMutex.RLock()
for _, thread := range worker.threads {
select {
case thread.requestChan <- ch:
worker.threadMutex.RUnlock()
<-ch.frankenPHPContext.done
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))

return nil
default:
// thread is busy, continue
}
}
worker.threadMutex.RUnlock()

// if no thread was available, mark the request as queued and use semaphore admission control
// mark the request as queued and use admission control
metrics.QueuedWorkerRequest(worker.name)

workerScaleChan := scaleChan
Expand All @@ -281,6 +266,18 @@ func (worker *worker) handleRequest(ch contextHolder) error {
}
defer worker.semaphore.Release(1)

// Fast path: try to get an idle thread from the pool
if idle := worker.threadPool.Get(); idle != nil {
handler := idle.(*workerThread)
// Direct handoff - send work to the thread's dedicated channel
handler.workReady <- ch
metrics.DequeuedWorkerRequest(worker.name)
<-ch.frankenPHPContext.done
metrics.StopWorkerRequest(worker.name, time.Since(ch.frankenPHPContext.startedAt))
return nil
}

// Slow path: no idle thread in pool, use the global channel
worker.requestChan <- ch
metrics.DequeuedWorkerRequest(worker.name)
<-ch.frankenPHPContext.done
Expand Down