Skip to content

Conversation

@mcollina
Copy link
Member

@mcollina mcollina commented Dec 3, 2025

Summary

Implements early request rejection (load shedding) to prevent request queue buildup during overload. This addresses the "early rejection problem" where requests pile up in Node.js's event loop queue, consuming memory and increasing latency.

  • Add canAccept hook that runs before routing to workers
  • When all workers reject (return false), immediately return 503 LoadSheddingError
  • Add metadata support via route(url, worker, meta) to identify workers
  • Requests are shed before any MessageChannel creation or thread communication

Usage

const interceptor = createThreadInterceptor({
  domain: '.local',
  canAccept: (ctx) => {
    // ctx: { hostname, method, path, headers, port, meta }
    return loadMap.get(ctx.meta.id) < ctx.meta.maxLoad
  }
})

interceptor.route('api', worker1, { id: 'w1', maxLoad: 100 })
interceptor.route('api', worker2, { id: 'w2', maxLoad: 50 })

Changes

  • lib/utils.js - Add LoadSheddingError class and kMeta symbol
  • lib/hooks.js - Add canAccept hook support
  • lib/roundrobin.js - Add findAccepting() method with metadata
  • lib/interceptor.js - Add load shedding check before dispatch
  • lib/coordinator.js - Accept metadata in route()
  • index.js - Export LoadSheddingError
  • README.md - Full documentation with examples
  • test/load-shedding.test.js - 12 tests covering all functionality

Test plan

  • All existing tests pass
  • 100% code coverage maintained
  • New tests for load shedding behavior
  • New tests for metadata feature

🤖 Generated with Claude Code

mcollina and others added 3 commits December 2, 2025 22:08
Implement early request rejection to prevent request queue buildup during
overload. The new `canAccept` hook allows consumers to implement custom
load detection logic that runs before any worker thread communication.

Key changes:
- Add `canAccept` hook option that receives request context including port
- Add `findAccepting()` method to RoundRobin to iterate all workers
- Add `LoadSheddingError` with 503 status code for rejected requests
- Requests are shed immediately without entering any queue

Usage:
```javascript
const interceptor = createThreadInterceptor({
  domain: '.local',
  canAccept: (ctx) => {
    // ctx contains: hostname, method, path, headers, port
    return !isWorkerBusy(ctx.port)
  }
})
```

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Document the canAccept hook and LoadSheddingError for early request
rejection. Includes examples for memory-based shedding, per-worker
inflight tracking, and method-based shedding.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Allow attaching metadata to workers when routing to help identify them
in the canAccept hook. The metadata is passed as a third argument to
route() and is available in the canAccept context as ctx.meta.

Usage:
```javascript
interceptor.route('api', worker1, { id: 'w1', maxLoad: 100 })
interceptor.route('api', worker2, { id: 'w2', maxLoad: 50 })

canAccept: (ctx) => {
  return loadMap.get(ctx.meta.id) < ctx.meta.maxLoad
}
```

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Collaborator

@ShogunPanda ShogunPanda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nit, but it LGTM.

if (!port) {
const route = routes.get(hostname)
if (!route || route.length === 0) {
throw new Error(`No target found for ${hostname} in thread ${threadId}.`)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're missing handler.onError here like you did on line 60.

Address review feedback: use handler.onError instead of throwing
directly when no route is found, for consistency with load shedding
error handling.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Resolved conflicts in:
- lib/coordinator.js: Combined metadata support with async wiring
- lib/roundrobin.js: Added both kMeta and kReady imports
- lib/interceptor.js: Added null check for round-robin when no ready workers

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Copy link
Member

@ivan-tymoshenko ivan-tymoshenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I as a user want to specify the canAccept hook on a server side where I have all the info about the thread, server state, contexts, etc. How it should know that the worker can't accept more request with just a MessagePort instance?

If I want a server to stop accepting requests I need to write some logic in a handler and put it in all other "client" workers. For example if we want to set diff limits for diff stackables: next, node or just get it from watt.json in the future it will be a problem.

How about putting the hook on a server side, calling it when it receives reqs and sending the MESSAGE_ROUTE_UPDATE with the ready set to true/false to resume/pause the traffic.

Of course it's will have some delay and requests will pile up a bit, but I don't see how it can be used in a current state. Maybe I'm missing some usecase how it will be used in platformatic.

for (let i = 0; i < this.ports.length; i++) {
const portIndex = (this.index + i) % this.ports.length
const port = this.ports[portIndex]
if (canAcceptFn({ ...context, port, meta: port[kMeta] })) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we check the can accept logic on each call it might have some perf impact. I would check it at least every n calls and set a flag to a port[kCanAccept] and then check the flag.

@ivan-tymoshenko
Copy link
Member

ivan-tymoshenko commented Dec 17, 2025

I see the maxLoad examples. This is probably the only reasonable metrics that we can get about the worker having a MessagePort instance. If we really want it maybe it's better to add it as a regular route method option. This would allow us to implement it in an efficient way and not call canAccept on every request.

maxLoad calculates how many requests one specific client have sent. Not how many requests a "server" worker received. This is better than nothing, but again a user probably wants to set a limit on a server side.

@mcollina
Copy link
Member Author

I as a user want to specify the canAccept hook on a server side where I have all the info about the thread, server state, contexts, etc. How it should know that the worker can't accept more request with just a MessagePort instance?

In Watt, we can rely on health events to track this, and know with a resolution of ~1s if a service is in trouble. I designed this to be able to use this.

I concur that adopting a "flow control" system might be better. Let me iterate.

@ivan-tymoshenko
Copy link
Member

ivan-tymoshenko commented Dec 17, 2025

In Watt, we can rely on health events to track this, and know with a resolution of ~1s if a service is in trouble. I designed this to be able to use this.

Ok, but watt health events are emitted in the main thread. Then you need to propagate this info to all other workers, so they stop sending requests to the unhealthy worker.

Or this feature is only for the "coordinator" thread?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants