Skip to content

Conversation

@sunlei1024
Copy link
Collaborator

@sunlei1024 sunlei1024 commented Dec 8, 2025

Motivation

This PR introduces a new FMQ (Fast Message Queue) module built on top of ZeroMQ, providing an efficient IPC/TCP messaging layer for FastDeploy. It also includes benchmark utilities for evaluating latency, throughput, and shared-memory performance under different workloads.

Key motivations:

  • Provide a lightweight, high-performance messaging backend for internal components.
  • Support shared-memory acceleration for large payloads.
  • Enable flexible runtime configuration through environment variables.
  • Unify queue and pub/sub communication behind a consistent endpoint abstraction.
  • Supply benchmark tools for validating FMQ performance.

Modifications

1. ZeroMQ-based FMQ Implementation

  • Implemented queue semantics using PUSH/PULL sockets.
  • Implemented topic semantics using PUB/SUB sockets.
  • Added shared-memory fast path for large messages.
  • Integrated a message serialization layer using ForkingPickler for robust IPC payload handling.

2. Unified Configuration System with Dynamic Endpoint Abstraction

  • Introduced the FMQ_CONFIG_JSON environment variable as the runtime configuration source.

  • Removed reliance on static configuration files.

  • Added a unified Endpoint class that encapsulates:

    • protocol (ipc / tcp)
    • address(path / host:port)
    • optional io_threads override
    • copy-mode override (shared-memory vs non-shm)
  • Updated EndpointManager to construct connection addresses using this unified abstraction.

  • Supports per-endpoint configuration to allow flexible and dynamic runtime setups.

FMQ_CONFIG_JSON example:

{
  "ipc_root": "/dev/shm",
  "io_threads": 1,
  "copy": false,
  "endpoints": {
    "test_queue": {
      "protocol": "ipc",
      "address": "/dev/shm/fmq_test_queue.ipc",
      "io_threads": 1,
      "copy": false
    },
    "test_topic": {
      "protocol": "ipc",
      "address": "/dev/shm/fmq_test_topic.ipc",
      "io_threads": 1,
      "copy": false
    }
  }
}

3. FMQ Benchmark Suite

  • Added micro-benchmarks covering:

    • message latency
    • throughput under various payload sizes
    • shared-memory vs non-shared-memory performance
  • Enables users to profile and validate FMQ performance across scenarios.


Usage

Run FMQ Benchmarks

python benchmarks/benchmark_fmq.py

Benchmark Output Example

Below is an example output from the new FMQ benchmark tool:

FastDeploy Message Queue Benchmark, pid:33360
Producers: 1
Consumers: 1
Messages per producer: 1000
Total bytes: 0.98 GB
Total messages: 1,000
Payload per message: 1.00 MB
Producer-0: 100%|███████████████████████████████████| 1000/1000 [00:00<00:00, 5526.49it/s]
Consumer-0: 100%|███████████████████████████████████| 1000/1000 [00:00<00:00, 4840.92it/s]

Producer Stats:
[Producer-0] Sent 1,000 msgs in 0.180 s | Throughput: 5,545 msg/s | Bandwidth: 5544.68 MB/s

Consumer Latency Stats:
[Consumer-0] msgs= 1000 | avg=0.522 ms | P50=0.465 ms | P95=0.781 ms | P99=1.199 ms

Global Summary:
Total messages   : 1,000
Total data       : 1000.00 MB
Total time       : 0.178 s
Global throughput: 5,627 msg/s
Global bandwidth : 5626.57 MB/s
Latency (ms)     : avg=0.522 | min=0.358 | max=3.649 | P50=0.465 | P95=0.781 | P99=1.199

Example: Queue Usage

from fmq import FMQ
fmq = FMQ()

producer = fmq.queue("test", role="producer")
consumer = fmq.queue("test", role="consumer")

await producer.put(b"hello")
msg = await consumer.get()

Configure FMQ via Environment Variable

export FMQ_CONFIG_JSON='{
  "default_protocol": "ipc",
  "endpoints": {
    "test": { "protocol": "ipc", "path": "/dev/shm/fmq_test.ipc" }
  }
}'

Accuracy

This PR does not modify model logic or inference kernels.
No accuracy changes are introduced.


Checklist

  • Added tag(s) in PR title, e.g. [Feature] [Benchmark]
  • Code formatted; pre-commit executed
  • Unit tests added
  • Accuracy validation not required
  • Not a cherry-pick PR

sunlei1024 and others added 10 commits December 6, 2025 14:25
Co-authored-by: freeliuzc <lzc842650834@gmail.com>
* [RL] Support Rollout Routing Replay

* add routing indices cache

* fix config bug and moe forward bug

* R3 Support GLM

* support eb4.5

* fix merge bug

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* add routing replay ci

* support glm topk

* support orther top_k

* fix ci bug

* pre-commit

* only support chatcmpl

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Yuanle Liu <yuanlehome@163.com>
…5374)

* fix multi-inputs

* fix threshold

* fix threshold

* fix
* [RL] Support Rollout Routing Replay

* add routing indices cache

* fix config bug and moe forward bug

* R3 Support GLM

* support eb4.5

* fix merge bug

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* add routing replay ci

* support glm topk

* support orther top_k

* fix ci bug

* pre-commit

* only support chatcmpl

* Revert "Revert "[RL] Support Rollout Routing Replay (#5321)" (#5402)"

This reverts commit c45e064.

* Fix XPU and NPU bug

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Yuanle Liu <yuanlehome@163.com>
@paddle-bot
Copy link

paddle-bot bot commented Dec 8, 2025

Thanks for your contribution!

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a ZeroMQ-based Fast Message Queue (FMQ) implementation for FastDeploy, providing high-performance IPC/TCP messaging with shared-memory optimization for large payloads. It includes a factory pattern for standard queue communication between API server, engine, and worker components, along with comprehensive benchmarking tools.

Key Changes:

  • Implemented FMQ core with PUSH/PULL and PUB/SUB patterns, shared-memory fast path for large messages (>1MB), and async message handling
  • Added FMQFactory to standardize queue creation for inter-component communication (api↔engine↔worker)
  • Created benchmark suite to measure latency, throughput, and bandwidth under various workloads

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 27 comments.

Show a summary per file
File Description
fastdeploy/inter_communicator/fmq.py Core FMQ implementation with Queue/Topic classes, shared memory support, and ZeroMQ socket management
fastdeploy/inter_communicator/fmq_factory.py Factory providing standardized queue endpoints for api-engine-worker communication flow
fastdeploy/envs.py Added FMQ_CONFIG_JSON environment variable for runtime configuration
fastdeploy/utils.py Added fmq_logger for FMQ-specific logging
benchmarks/benchmark_fmq.py Multi-process benchmark tool measuring latency, throughput, and bandwidth with progress tracking
tests/inter_communicator/test_fmq.py Unit tests for queue operations, shared memory transfers, and pub/sub functionality
tests/inter_communicator/test_fmq_factory.py Unit tests for factory methods, message roundtrip, and context sharing

if msg.descriptor:
msg.payload = msg.descriptor.read_and_unlink()

self._msg_id += 1
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _msg_id is incremented in the consumer's get() method, but this field represents messages sent by the producer. Incrementing it in the consumer is incorrect and serves no purpose. This line should be removed.

Suggested change
self._msg_id += 1

Copilot uses AI. Check for mistakes.
msg = await q.get()
recv_ts = time.perf_counter()
if msg is None:
pbar.write("recv None")
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented-out debug code should be removed before merging to keep the codebase clean.

Suggested change
pbar.write("recv None")

Copilot uses AI. Check for mistakes.
Comment on lines +310 to +326
class FMQ:
_instance = None
_context = None

def __new__(cls, config_path="fmq_config.json"):
if cls._instance is None:
cls._instance = super().__new__(cls)
EndpointManager.load_config()

# Determine IO threads based on global defaults
io_threads = 1
if EndpointManager.config.endpoints:
# Use max io_threads among all endpoints
io_threads = max(ep.io_threads for ep in EndpointManager.config.endpoints.values())

cls._context = zmq.asyncio.Context(io_threads=io_threads)
return cls._instance
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a singleton pattern with __new__ is not thread-safe. In a multi-threaded environment, multiple threads could create multiple instances. Consider using a thread-safe singleton pattern with a lock, or better yet, document that FMQ should be instantiated once and reused.

Copilot uses AI. Check for mistakes.
Comment on lines +238 to +257
async def get(self, timeout: int = None) -> Optional[Message]:
# Receive data from queue
if self.role != Role.CONSUMER:
raise PermissionError("Only consumers can get messages.")

try:
if timeout:
raw = await asyncio.wait_for(self.socket.recv(), timeout / 1000)
else:
raw = await self.socket.recv(copy=self.copy)
except asyncio.TimeoutError:
fmq_logger.error(f"Timeout receiving message on {self.name}")
return None

msg = Message.deserialize(raw)
if msg.descriptor:
msg.payload = msg.descriptor.read_and_unlink()

self._msg_id += 1
return msg
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The get() method lacks proper error handling for deserialization failures. If Message.deserialize(raw) fails due to corrupted data, it will raise an uncaught exception. Consider wrapping this in a try-except block and logging the error:

try:
    msg = Message.deserialize(raw)
except Exception as e:
    fmq_logger.error(f"Failed to deserialize message on {self.name}: {e}")
    return None

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +139 to +148
def read_and_unlink(self) -> bytes:
# Read and cleanup shared memory
try:
shm = shared_memory.SharedMemory(name=self.shm_name)
data = bytes(shm.buf[: self.size])
shm.close()
shm.unlink()
return data
except FileNotFoundError:
return b""
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling path in read_and_unlink() that catches FileNotFoundError lacks test coverage. Add a test case that simulates shared memory cleanup failure to ensure this error path is properly tested.

Copilot uses AI. Check for mistakes.
send_ts = time.perf_counter()
await q.put(data={"pid": proc_id, "i": i, "send_ts": send_ts, "payload": payload}, shm_threshold=shm_threshold)
pbar.update(1)
# pbar.write(f"send {i}")
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commented-out debug code should be removed before merging to keep the codebase clean:

pbar.update(1)
Suggested change
# pbar.write(f"send {i}")

Copilot uses AI. Check for mistakes.
Comment on lines +248 to +250
except asyncio.TimeoutError:
fmq_logger.error(f"Timeout receiving message on {self.name}")
return None
Copy link

Copilot AI Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout error handling path lacks test coverage. Add a test case that triggers a timeout to ensure the error logging and None return work as expected.

Copilot uses AI. Check for mistakes.
sunlei1024 and others added 3 commits December 8, 2025 13:26
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@Jiang-Jia-Jun Jiang-Jia-Jun merged commit 5fb93d8 into develop Dec 8, 2025
21 of 26 checks passed
@Jiang-Jia-Jun Jiang-Jia-Jun deleted the feature/fmq_20251205 branch December 8, 2025 14:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants