-
-
Notifications
You must be signed in to change notification settings - Fork 27
feat: introduce fluent API for dispatch() helper function #999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This commit introduces a new fluent dispatch API to provide a more expressive and flexible way to dispatch jobs, AMQP messages, and Kafka messages.
Key changes:
- Add new `FriendsOfHyperf\Support\dispatch()` helper function that returns pending dispatch objects
- Implement `PendingAsyncQueueDispatch` for async queue jobs with fluent methods: `onPool()`, `delay()`, `setMaxAttempts()`
- Implement `PendingAmqpProducerMessageDispatch` for AMQP messages with fluent methods: `onPool()`, `setPayload()`, `withHeader()`, `setConfirm()`, `setTimeout()`
- Implement `PendingKafkaProducerMessageDispatch` for Kafka messages with fluent methods: `onPool()`, `setKey()`, `setValue()`, `withHeader()`
- All pending dispatch classes support Hyperf's `Conditionable` trait for conditional chaining
- Automatically wrap closures in `CallQueuedClosure` when dispatched
- Deprecate global `dispatch()` helper in favor of namespace-qualified version
- Add comprehensive test coverage for all dispatch scenarios
Benefits:
- Cleaner, more readable code with method chaining
- Type-safe dispatch configuration
- Consistent API across different message broker types
- Backward compatible with existing code
Example usage:
```php
use function FriendsOfHyperf\Support\dispatch;
// Async queue with fluent API
dispatch(new ProcessPodcast($podcast))
->onPool('high-priority')
->delay(60)
->setMaxAttempts(3);
// AMQP message with fluent API
dispatch($amqpMessage)
->onPool('notifications')
->withHeader('trace-id', $traceId)
->setConfirm(true)
->setTimeout(10);
// Kafka message with fluent API
dispatch($kafkaMessage)
->onPool('events')
->withHeader('user-id', $userId)
->setKey($partitionKey);
// Closure support
dispatch(function () {
// Job logic here
})->onPool('default')->delay(30);
```
|
Caution Review failedThe pull request is closed. Walkthrough新增命名空间下的 Changes
Sequence Diagram(s)sequenceDiagram
participant User as 用户代码
participant SupportFns as FriendsOfHyperf\Support\dispatch()
participant Pending as Pending*Dispatch
participant Container as ApplicationContext
participant Driver as 驱动/生产者
User->>SupportFns: dispatch($job)
activate SupportFns
SupportFns->>SupportFns: 匹配类型 (Closure / AMQP / Kafka / Job)
alt AMQP
SupportFns-->>Pending: 返回 PendingAmqpProducerMessageDispatch
else Kafka
SupportFns-->>Pending: 返回 PendingKafkaProducerMessageDispatch
else AsyncQueue
SupportFns-->>Pending: 返回 PendingAsyncQueueDispatch
else Closure
SupportFns-->>Pending: 包装为 CallQueuedClosure -> PendingAsyncQueueDispatch
else Unsupported
SupportFns-->>User: 抛出 InvalidArgumentException
end
deactivate SupportFns
User->>Pending: 链式配置 (onPool/setTimeout/delay/withHeader...)
activate Pending
Pending-->>User: 返回 this
deactivate Pending
Note over User,Pending: Pending 对象销毁时触发实际发送
User->>Pending: __destruct()
activate Pending
Pending->>Container: getContainer()->get(Driver)
Pending->>Driver: produce()/sendBatch()/push()
Driver-->>Pending: 返回/抛错
deactivate Pending
Estimated code review effort🎯 3 (中等) | ⏱️ ~25 分钟
Possibly related PRs
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
📜 Recent review detailsConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro 📒 Files selected for processing (3)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 PHPStan (2.1.31)At least one path must be specified to analyse. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
src/support/src/Functions.php (1)
39-44: 简化类型检查逻辑。
interface_exists()和class_exists()检查是多余的。如果instanceof检查成功,则该接口或类必然已存在。这些防御性检查增加了不必要的运行时开销。可以简化为:
return match (true) { - interface_exists(ProducerMessageInterface::class) && $job instanceof ProducerMessageInterface => new PendingAmqpProducerMessageDispatch($job), - class_exists(ProduceMessage::class) && $job instanceof ProduceMessage => new PendingKafkaProducerMessageDispatch($job), - interface_exists(JobInterface::class) && $job instanceof JobInterface => new PendingAsyncQueueDispatch($job), + $job instanceof ProducerMessageInterface => new PendingAmqpProducerMessageDispatch($job), + $job instanceof ProduceMessage => new PendingKafkaProducerMessageDispatch($job), + $job instanceof JobInterface => new PendingAsyncQueueDispatch($job), default => throw new InvalidArgumentException('Unsupported job type.') };
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
src/helpers/src/Functions.php(1 hunks)src/support/src/Bus/PendingAmqpProducerMessageDispatch.php(1 hunks)src/support/src/Bus/PendingAsyncQueueDispatch.php(1 hunks)src/support/src/Bus/PendingKafkaProducerMessageDispatch.php(1 hunks)src/support/src/Functions.php(1 hunks)tests/Support/DispatchTest.php(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
src/*/src/**/*.php
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
All component PHP code must use the namespace pattern FriendsOfHyperf{ComponentName}
Files:
src/support/src/Functions.phpsrc/support/src/Bus/PendingAmqpProducerMessageDispatch.phpsrc/helpers/src/Functions.phpsrc/support/src/Bus/PendingKafkaProducerMessageDispatch.phpsrc/support/src/Bus/PendingAsyncQueueDispatch.php
**/*.php
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
**/*.php: Code style must follow PSR-12 and be formatted by PHP-CS-Fixer per .php-cs-fixer.php
Run PHPStan and keep the codebase passing per phpstan.neon.dist
Files:
src/support/src/Functions.phpsrc/support/src/Bus/PendingAmqpProducerMessageDispatch.phpsrc/helpers/src/Functions.phptests/Support/DispatchTest.phpsrc/support/src/Bus/PendingKafkaProducerMessageDispatch.phpsrc/support/src/Bus/PendingAsyncQueueDispatch.php
tests/**/*.php
📄 CodeRabbit inference engine (.github/copilot-instructions.md)
All tests should reside under tests/ and use PHPUnit following Hyperf testing patterns
Files:
tests/Support/DispatchTest.php
🧬 Code graph analysis (5)
src/support/src/Functions.php (5)
src/async-queue-closure-job/src/CallQueuedClosure.php (1)
CallQueuedClosure(21-55)src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (1)
PendingAmqpProducerMessageDispatch(22-76)src/support/src/Bus/PendingAsyncQueueDispatch.php (1)
PendingAsyncQueueDispatch(19-56)src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (1)
PendingKafkaProducerMessageDispatch(25-67)src/helpers/src/Functions.php (1)
dispatch(197-224)
src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (3)
src/http-client/src/PendingRequest.php (2)
pool(841-852)timeout(585-590)src/support/src/Bus/PendingAsyncQueueDispatch.php (3)
__construct(27-29)__destruct(31-37)onPool(45-49)src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (4)
__construct(31-33)__destruct(35-41)onPool(43-47)withHeader(61-66)
tests/Support/DispatchTest.php (5)
src/async-queue-closure-job/src/CallQueuedClosure.php (1)
CallQueuedClosure(21-55)src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (5)
PendingAmqpProducerMessageDispatch(22-76)onPool(44-48)setConfirm(65-69)setTimeout(71-75)withHeader(56-63)src/support/src/Bus/PendingAsyncQueueDispatch.php (4)
PendingAsyncQueueDispatch(19-56)onPool(45-49)delay(51-55)setMaxAttempts(39-43)src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (3)
PendingKafkaProducerMessageDispatch(25-67)onPool(43-47)withHeader(61-66)src/support/src/Functions.php (1)
dispatch(33-45)
src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (3)
src/facade/src/Kafka.php (2)
Kafka(22-54)sendBatch(33-47)src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (4)
__construct(32-34)__destruct(36-42)onPool(44-48)withHeader(56-63)src/support/src/Bus/PendingAsyncQueueDispatch.php (3)
__construct(27-29)__destruct(31-37)onPool(45-49)
src/support/src/Bus/PendingAsyncQueueDispatch.php (3)
src/facade/src/AsyncQueue.php (1)
AsyncQueue(22-40)src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (3)
__construct(32-34)__destruct(36-42)onPool(44-48)src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (3)
__construct(31-33)__destruct(35-41)onPool(43-47)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: Test on PHP 8.3 with Swoole 5.1.8
- GitHub Check: Test on PHP 8.1 with Swoole 6.1.2
- GitHub Check: Test on PHP 8.2 with Swoole 5.1.8
- GitHub Check: Test on PHP 8.1 with Swoole 5.1.8
- GitHub Check: Test on PHP 8.1 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.2 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.2 with Swoole 6.1.2
- GitHub Check: Test on PHP 8.3 with Swoole 6.1.2
- GitHub Check: Test on PHP 8.3 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.1 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.1 with Swoole 5.1.8
- GitHub Check: Test on PHP 8.3 with Swoole 6.1.2
- GitHub Check: Test on PHP 8.1 with Swoole 6.1.2
- GitHub Check: Test on PHP 8.2 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.2 with Swoole 5.1.8
- GitHub Check: Test on PHP 8.2 with Swoole 6.1.2
- GitHub Check: Test on PHP 8.3 with Swoole 5.1.8
- GitHub Check: Test on PHP 8.3 with Swoole 6.0.2
🔇 Additional comments (8)
src/helpers/src/Functions.php (1)
193-193: 文档更新正确,提供了清晰的迁移路径。弃用注解明确说明了版本时间线和推荐的替代方案,有助于用户平滑迁移。
src/support/src/Functions.php (1)
35-37: 闭包包装逻辑正确。将闭包自动包装为
CallQueuedClosure使得队列可以序列化和执行闭包,这是队列系统的常见模式。src/support/src/Bus/PendingAsyncQueueDispatch.php (1)
19-56: 实现清晰且符合预期。该类通过析构函数模式实现延迟执行,提供流式 API 用于配置队列、延迟和最大重试次数。使用
Conditionabletrait 支持条件链式调用,整体设计合理。src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (2)
36-42: 析构函数逻辑正确,短路求值使用得当。Line 38 使用
&&短路求值来条件性地设置 pool name,简洁高效。析构函数确保在对象销毁时自动发送消息。
56-63: 验证外部库 Hyperf AMQP 的 ProducerMessageInterface 公开 API根据代码分析和 web 搜索:
ProducerMessageInterface来自外部库Hyperf\Amqp\Message,而非本地实现- 当前代码通过闭包的
call()方法访问$this->message的私有属性$properties来设置application_headers- 相同的闭包模式也被使用在
src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php中- web 搜索结果显示 ProducerMessageInterface 可能不提供便捷的公开方法来动态添加 headers
通过闭包访问私有属性是在与受限外部库交互时的合理适配方案,尤其是:
- 代码有明确的
@property类型提示声明意图- 代码有
@phpstan-ignore-line注释- 此模式在代码库中被一致使用
- 功能由测试覆盖
请确认 Hyperf AMQP 库的
ProducerMessageInterface文档,验证是否真的不存在公开的 header 设置方法(如setHeader()、addHeader()等)。如果公开方法存在,应使用之;如果不存在,当前实现是合理的。src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (2)
35-41: 析构函数实现正确。通过
ProducerManager获取生产者并使用sendBatch发送消息,符合 Hyperf Kafka 的使用模式。
49-66: 闭包模式是必要的实现方案,而非可避免的脆弱性。验证发现,
longlang/phpkafkav1.2.5 中的ProduceMessage类不提供公共的 setter 方法(如setKey()、setValue()等)来修改已构建对象的属性。属性key、value和headers都是私有的。因此,使用闭包访问这些私有属性是修改已构建消息对象的唯一可行方案。这个实现方式:
- 与现有的
PendingAmqpProducerMessageDispatch模式保持一致- 由测试用例验证其正确性(如
testPendingKafkaProducerMessageDispatchFluentChaining)- 是外部库 API 设计的必然结果,而非代码设计问题
无需修改。
tests/Support/DispatchTest.php (1)
1-548: 测试覆盖全面且结构良好。测试套件包含 55 个断言,覆盖了:
- 不同任务类型的调度(闭包、JobInterface、AMQP、Kafka)
- 流式 API 的链式调用
- 条件配置(when/unless)
- 析构时的执行行为
- 错误处理和向后兼容性
使用 Mockery 进行依赖隔离,通过反射验证内部状态,测试设计合理。
* feat: introduce fluent API for dispatch() helper function
This commit introduces a new fluent dispatch API to provide a more expressive and flexible way to dispatch jobs, AMQP messages, and Kafka messages.
Key changes:
- Add new `FriendsOfHyperf\Support\dispatch()` helper function that returns pending dispatch objects
- Implement `PendingAsyncQueueDispatch` for async queue jobs with fluent methods: `onPool()`, `delay()`, `setMaxAttempts()`
- Implement `PendingAmqpProducerMessageDispatch` for AMQP messages with fluent methods: `onPool()`, `setPayload()`, `withHeader()`, `setConfirm()`, `setTimeout()`
- Implement `PendingKafkaProducerMessageDispatch` for Kafka messages with fluent methods: `onPool()`, `setKey()`, `setValue()`, `withHeader()`
- All pending dispatch classes support Hyperf's `Conditionable` trait for conditional chaining
- Automatically wrap closures in `CallQueuedClosure` when dispatched
- Deprecate global `dispatch()` helper in favor of namespace-qualified version
- Add comprehensive test coverage for all dispatch scenarios
Benefits:
- Cleaner, more readable code with method chaining
- Type-safe dispatch configuration
- Consistent API across different message broker types
- Backward compatible with existing code
Example usage:
```php
use function FriendsOfHyperf\Support\dispatch;
// Async queue with fluent API
dispatch(new ProcessPodcast($podcast))
->onPool('high-priority')
->delay(60)
->setMaxAttempts(3);
// AMQP message with fluent API
dispatch($amqpMessage)
->onPool('notifications')
->withHeader('trace-id', $traceId)
->setConfirm(true)
->setTimeout(10);
// Kafka message with fluent API
dispatch($kafkaMessage)
->onPool('events')
->withHeader('user-id', $userId)
->setKey($partitionKey);
// Closure support
dispatch(function () {
// Job logic here
})->onPool('default')->delay(30);
```
* feat: add dispatch() function tests in Support and Functions helpers
* fix: update docblock for dispatch() function to clarify parameter type
---------
Co-Authored-By: Deeka Wong <8337659+huangdijia@users.noreply.github.com>
Summary
This PR introduces a new fluent dispatch API that provides a more expressive and flexible way to dispatch jobs, AMQP messages, and Kafka messages. The implementation follows Laravel's dispatch pattern while adapting it for Hyperf's architecture.
Key Features
New dispatch() Helper Function
FriendsOfHyperf\Support\dispatch()CallQueuedClosurePending Dispatch Classes
PendingAsyncQueueDispatch (for async queue jobs)
onPool(string $pool)- Set the queue pooldelay(int $delay)- Set delay in secondssetMaxAttempts(int $maxAttempts)- Set maximum retry attemptsPendingAmqpProducerMessageDispatch (for AMQP messages)
onPool(string $pool)- Set the AMQP connection poolsetPayload(mixed $data)- Set message payloadwithHeader(string $key, mixed $value, ?int $ttl)- Add custom headerssetConfirm(bool $confirm)- Enable publisher confirmssetTimeout(int $timeout)- Set timeout in secondsPendingKafkaProducerMessageDispatch (for Kafka messages)
onPool(string $pool)- Set the Kafka producer poolsetKey(string $key)- Set partition keysetValue(string $value)- Set message valuewithHeader(string $key, string $value)- Add custom headersAdditional Features
Conditionabletrait for conditional method chainingExample Usage
Migration Notes
Deprecation Notice
The global
dispatch()helper infriendsofhyperf/helpersis now deprecated and will be removed in v3.2. Users should migrate to the namespace-qualified version:Breaking Changes
None. This PR is fully backward compatible.
Test Plan
Benefits
Related Issues
Closes any related issues if applicable.
Checklist
Summary by CodeRabbit
新功能
文档
测试