-
Notifications
You must be signed in to change notification settings - Fork 48
Add support for RPC over queue (e.g. RPC over Redis) execution mode #698
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
This adds a new execution mode where RPC requests are executed locally on
the worker node that dequeues work, rather than being proxied over TCP.
Changes:
- Add PollLocalAsync() method to HalibutRuntime for local queue polling
- Support local:// URI scheme for local execution endpoints
- Workers poll queue directly and execute RPCs locally via ServiceInvoker
- Add comprehensive design document explaining architecture and usage
- Add test fixture demonstrating local execution mode
Benefits:
- 10-100x lower latency (no TCP/SSL overhead)
- True horizontal scaling via worker pools
- Queue-agnostic (works with in-memory and Redis queues)
- Backward compatible with existing code
Usage:
```csharp
// Worker
var worker = new HalibutRuntime(serviceFactory);
worker.Services.AddSingleton<IMyService>(new MyServiceImpl());
await worker.PollLocalAsync(new Uri("local://worker-pool-a"), cancellationToken);
// Client
var client = new HalibutRuntime(serviceFactory);
var service = client.CreateAsyncClient<IMyService, IAsyncClientMyService>(
new ServiceEndPoint("local://worker-pool-a", null));
await service.DoWorkAsync();
```
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
…ModeFixture The LocalExecutionModeFixture test uses Redis functionality (RedisFacadeBuilder, RedisPendingRequestQueueFactory) which is only available in .NET 8.0 or greater. Added #if NET8_0_OR_GREATER directive to match the pattern used in other Redis queue tests in the codebase. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Added back the SimplePollingExample test implementation that demonstrates basic polling mode with TCP. This test serves as a reference example for the Halibut polling pattern. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
176c9a4 to
2ab6dda
Compare
The hardcoded limit of 100 log events is now accessible via InMemoryConnectionLog.MaxLogEvents, allowing external code to reference this configuration value. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Making the class public so that the MaxLogEvents field can be accessed from outside the assembly. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
| public class HalibutExamplesFixture : BaseTest | ||
| { | ||
| [Test] | ||
| public async Task SimplePollingExample() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this since we lack simple examples of how to use halibit.
f54d365 to
d923006
Compare
| { | ||
| public static class InMemoryConnectionLogLimits | ||
| { | ||
| public static readonly int MaxLogEventsStored = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be used in Octopus to limit returned logs in multi node setups.
rhysparry
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just need to clarify the final scheme.
source/Halibut/HalibutRuntime.cs
Outdated
| { | ||
| public class HalibutRuntime : IHalibutRuntime | ||
| { | ||
| public const string QueueEndpointScheme = "local"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were we switching this to queue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I was sure I did that......
source/Halibut/HalibutRuntime.cs
Outdated
| { | ||
| if (queueOnlyEndpoint.Scheme.ToLowerInvariant() != QueueEndpointScheme) | ||
| { | ||
| throw new ArgumentException($"Only 'queue://' endpoints are supported. Provided: {queueOnlyEndpoint.Scheme}://", nameof(queueOnlyEndpoint)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use the constant here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| using var workerCts = new CancellationTokenSource(); | ||
| var pollingTask = Task.Run(async () => | ||
| { | ||
| await worker.PollForRPCOverQueueAsync(new Uri("local://test-worker"), workerCts.Token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do change the scheme, this will need updating
|
|
||
| // Client creates proxy to local://test-worker and makes request | ||
| var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>( | ||
| new ServiceEndPoint("local://test-worker", null, client.TimeoutsAndLimits)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
rhysparry
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💚 LGTM
| { | ||
| var request = await queue.DequeueAsync(cancellationToken); | ||
|
|
||
| if (request != null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like if there is no request we immediately poll again? Is it worth adding a delay when there is no request waiting?
| Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work. | ||
| Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work. | ||
|
|
||
| ## RPC over Redis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm glad you added this section to the readme. This feels like a significant extra capability you've added to halibut here. I understand the value and our use case for it, I'm interested to hear how you are feeling about the conceptual overhead that this new capability adds.
For example I notice that in this readme you talk about "nodes" for the first time. Halibut is a client and server communication framework, so what is a "node" in this client serve model.
Also I notice in the "how it works" section. You talk about a client making an RPC call to a server... however in our use case of collecting in memory halibut logs, isn't it actually a client sending an RPC call to another client?
Lets use this thread to discuss the big picture question "How do we feel about the conceptual overhead we've added" and once we've covered that I can start new threads to nit pick the examples I've listed to give weight to my question.
I'm happy for you to merge while this discussion is ongoing as long as we're committed to addressing this question 😄
Summary
This PR adds a new execution mode where RPC requests are executed over the Polling Pending Request Queue (e.g. Redis Queue) without a TCP connection. This means, given two nodes that are configured to use the same Redis queue those nodes could execute RPCs on each other through the Redis queue, without the need for a direct connection between the two.
Background
Halibut is a RPC framework where the "client" initiates the RPC call and the "Service" executes that call. Ordinarily that Service is some remote machine and the RPC is made over a TCP connection. In Polling mode the remote Service connects to the client (the remote creates the TCP connection). When the clients are configured in a multi-node setup the Remote Polling Service may not connect to the client that wants to the initiate the RPC, for this case a Redis queue is used.
The Change
This is a trivial change as it takes the existing infrastructure for executing a RPC calls and just cuts out TCP.
Current:

Currently in a multi node setup, the RPC call goes via the Redis Queue and then down a TCP connection to the Service that executes the work.
src
New Queue Execution:
The new Queue based execution mode simply removes the TCP part and so allows the RPC call to go over the existing Redis Queue and be executed by another node connected to Redis.
Motivation
Requesting Halibut logs between nodes.
Halibut provides an in memory rolling log of the last 100 log lines per Endpoint. In a multi-node setup currently one must go to each node to get these logs. Since a multi-node setup would already have a shared Redis, the support for RPC over Redis makes it trivial to request the logs from each node.
Clients behind a load balancer.
We are sometimes in the situation in which we need work to by picked up by specific nodes e.g. a client is connected to only one node and we need that node to process the work.
With this change and a distributed queue (e.g. the Redis one), it would be possible to setup something like:
halibutRunTime.PollLocalAsync(new Uri("local://bob"), workerCts.Token)and so would begin to processes messages sent to "local://bob".var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>(new ("local://test-worker");Changes
Core Implementation
HalibutRuntime.PollForRPCOverQueueAsync()- New method that polls alocal://queue and executes RPCs locallyqueue://URI scheme support - Added to routing logic inSendOutgoingRequestAsync()GetQueue()and execute requests usingServiceInvokerUsage
see: RPCOverQueueExecutionModeFixture.SimpleRPCOverQueueExecutionExample