Skip to content

Conversation

@huangdijia
Copy link
Contributor

@huangdijia huangdijia commented Nov 19, 2025

Summary

This PR refactors the dispatch classes to extract common functionality into an abstract base class.

Changes

  • Created AbstractPendingDispatch - New abstract base class that:

    • Includes the Conditionable trait
    • Implements __destruct() method with defer support
    • Provides abstract dispatch() method for concrete implementations
    • Adds defer() method for deferred execution
  • Refactored dispatch classes to extend AbstractPendingDispatch:

    • PendingAmqpProducerMessageDispatch
    • PendingAsyncQueueDispatch
    • PendingKafkaProducerMessageDispatch

Each concrete class now:

  • Extends AbstractPendingDispatch instead of using Conditionable trait
  • Implements dispatch() method with their specific logic
  • Inherits defer functionality from the base class

Benefits

  • Code Reusability: Common dispatch logic is now shared
  • Consistency: All dispatch classes follow the same pattern
  • Maintainability: Changes to dispatch behavior can be made in one place
  • Extensibility: New dispatch types can easily extend the base class

Testing

The refactoring maintains backward compatibility - existing usage patterns remain unchanged.

Summary by CodeRabbit

  • 新功能

    • 新增 defer() 方法,支持延迟执行待处理的消息与队列分发操作。
  • 改进

    • 统一了异步队列、AMQP 与 Kafka 的分发行为,简化使用并提升一致性与可靠性。
    • 若未调用 defer(),实例生命周期结束前仍会触发分发,调用后改为延后执行。
  • 维护

    • 在建议项中新增可选引擎说明以支持协程运行时(可选)。

- Create AbstractPendingDispatch base class with Conditionable trait
- Move __destruct logic to abstract dispatch() method
- Add defer() method for deferred dispatch execution
- Refactor PendingAmqpProducerMessageDispatch, PendingAsyncQueueDispatch,
  and PendingKafkaProducerMessageDispatch to extend abstract class

This improves code reusability and maintains consistency across
different message dispatch implementations.
@coderabbitai
Copy link

coderabbitai bot commented Nov 19, 2025

Walkthrough

新增抽象基类 AbstractPendingDispatch,将多个待分发类重构为继承该基类;子类的析构发送逻辑被移入抽象的 dispatch(),并新增 defer() 以支持延迟调度;同时在 Sentry 的 composer.json 中添加了对 hyperf/engine 的 suggest 条目。

Changes

群组 / 文件 变更摘要
新增抽象基类
src/support/src/Bus/AbstractPendingDispatch.php
新增抽象基类,使用 Conditionable trait,包含私有 defer 标志;实现 __destruct() 在未 defer 时自动调用抽象 dispatch();提供 final public function defer(): void 来设置延迟并注册延迟回调;声明 abstract protected function dispatch(): void 由子类实现
AMQP 分发类
src/support/src/Bus/PendingAmqpProducerMessageDispatch.php
改为 extends AbstractPendingDispatch;移除 Conditionable trait 与原有 __destruct();添加 protected function dispatch(): void,将原析构中的消息准备与 Producer->produce() 逻辑迁移到此方法
异步队列分发类
src/support/src/Bus/PendingAsyncQueueDispatch.php
改为 extends AbstractPendingDispatch;移除 Conditionable trait 与原 __destruct();新增 protected function dispatch(): void,在该方法内根据 pool/ delay 将任务入队;保留原有配置方法(如 setMaxAttempts/onPool/delay)
Kafka 分发类
src/support/src/Bus/PendingKafkaProducerMessageDispatch.php
改为 extends AbstractPendingDispatch;移除 Conditionable trait 与析构逻辑;新增 protected function dispatch(): void 执行通过 ProducerManager 的发送;onPool(string $pool) 改为返回 static(链式)
Sentry composer
src/sentry/composer.json
suggest 中新增一条:hyperf/engine(说明:用于协程引擎 ^2.15.0),仅为建议依赖

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Pending as 子类 PendingDispatch
    participant Abstract as AbstractPendingDispatch
    participant Scheduler as 延迟调度器

    rect rgb(200,220,240)
    Note over Client,Abstract: 场景A — 未调用 defer(析构触发)
    Client->>Pending: 创建并配置实例
    Client->>Client: 超出作用域 / 销毁
    Pending->>Abstract: __destruct()
    Abstract->>Pending: 调用 dispatch()
    Pending->>Pending: 执行发送/入队/produce 等逻辑
    end

    rect rgb(240,220,200)
    Note over Client,Scheduler: 场景B — 调用 defer(延迟调度)
    Client->>Pending: 创建并配置实例
    Client->>Abstract: 调用 defer()
    Abstract->>Abstract: 设置 defer 标志
    Abstract->>Scheduler: 注册延迟回调(稍后触发)
    Client->>Client: 超出作用域 / 销毁
    Pending->>Abstract: __destruct()
    Abstract->>Abstract: 检查 defer 标志,跳过 dispatch
    Scheduler->>Pending: 延迟回调触发 dispatch()
    Pending->>Pending: 执行发送/入队/produce 等逻辑
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

  • 需要重点检查的文件/区域:
    • src/support/src/Bus/AbstractPendingDispatch.php:析构与延迟回调注册的实现细节(竞争条件、重复调度、异常处理)
    • src/support/src/Bus/PendingAmqpProducerMessageDispatch.phpPendingKafkaProducerMessageDispatch.php:确认原析构逻辑完整迁移到 dispatch(),包括确认、超时与连接/池的使用
    • src/support/src/Bus/PendingAsyncQueueDispatch.php:延迟与 pool 选择逻辑在新 dispatch() 中的行为一致性
    • 边界情况:多次调用 defer()、对象序列化/克隆、在析构过程中抛出异常的处理

Possibly related PRs

Poem

🐰 兔子颂

我在树下码字忙,消息分发终成行,
抽象基类搭桥梁,defer 轻点再启航,
析构变清爽,逻辑不慌张,代码跳跃唱一曲芳香。

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed 标题准确地总结了拉取请求的主要变更:提取公共调度逻辑到抽象基类 AbstractPendingDispatch。变更涉及创建新的抽象基类并重构三个具体的调度类继承它,这与标题的意图完全一致。
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor/abstract-pending-dispatch

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@huangdijia huangdijia marked this pull request as draft November 19, 2025 05:36
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (3)
src/support/src/Bus/PendingAsyncQueueDispatch.php (1)

18-18: 继承抽象基类并集中 dispatch 逻辑是合理的

  • 通过继承 AbstractPendingDispatch 把析构触发的派发逻辑收敛到了统一模板,dispatch() 内只关心具体的 async-queue 实现,结构清晰、易维护。
  • ApplicationContext::getContainer()->get(DriverFactory::class)->get($this->pool ?? 'default')->push($this->job, $this->delay) 与既有 async-queue 使用方式一致,默认 pool 为 default 也与其它地方约定统一,看不到行为上的回归风险。

可以后续视情况考虑的小优化(非必须):

  • 如果希望 dispatch() 的返回值语义更明确,可以在抽象类和所有子类上加上 : void 返回类型声明,以便静态分析工具更友好(前提是确认没有其他子类会返回值)。

Also applies to: 46-52

src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (1)

24-24: Kafka PendingDispatch 与抽象基类的整合基本合理

  • 改为继承 AbstractPendingDispatch 后,通过统一的 dispatch() 模板在析构时发送 Kafka 消息,链式配置方法(onPool、headers、key/value)保持不变,整体对外 API 看起来是向后兼容的。
  • ApplicationContext::getContainer()->get(ProducerManager::class)->getProducer($this->pool ?? 'default')->sendBatch([$this->message]) 的默认 pool 行为与 helpers 里 Kafka dispatch 的默认值一致,符合预期。

可选的小提升(非必须):

  • 如需完全对齐 Kafka::sendBatch() 中对消息 pool/queue 的推断($pool ?? $this->pool ?? $this->queue ?? 'default'),可以考虑把这段 pool 选择逻辑抽成一个共享方法以复用,避免未来两处逻辑渐行渐远。

Also applies to: 57-62

src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (1)

21-21: AMQP PendingDispatch 的重构与模板方法模式契合良好

  • 通过继承 AbstractPendingDispatch 并把 AMQP 发送逻辑移动到 dispatch(),将原本散落在析构中的行为集中到一个可测试的地方,同时保持了对外链式配置 API 的兼容性。
  • dispatch() 中按需设置 pool / routingKey / exchange,然后通过容器获取 Producer 调用 produce($this->message, $this->confirm, $this->timeout),和既有 AMQP 使用模式一致,行为上看不到明显回归风险。

可选的细节优化(非必须):

  • 当前使用的短路写法:
    • $this->pool && $this->message->setPoolName($this->pool);
    • $this->routingKey && $this->message->setRoutingKey($this->routingKey);
    • $this->exchange && $this->message->setExchange($this->exchange);
      会在值为 ''[] 等“假值”时跳过设置。若业务上这些值也被视为有效配置,可以考虑改成显式判 null,例如:
    if ($this->pool !== null) {
        $this->message->setPoolName($this->pool);
    }
    以避免Truthy/Falsy 语义带来的歧义。

Also applies to: 82-90

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4f23a57 and d1801c9.

📒 Files selected for processing (4)
  • src/support/src/Bus/AbstractPendingDispatch.php (1 hunks)
  • src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (2 hunks)
  • src/support/src/Bus/PendingAsyncQueueDispatch.php (2 hunks)
  • src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
src/support/src/Bus/AbstractPendingDispatch.php (3)
src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (1)
  • dispatch (82-90)
src/support/src/Bus/PendingAsyncQueueDispatch.php (1)
  • dispatch (46-52)
src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (1)
  • dispatch (57-63)
src/support/src/Bus/PendingAsyncQueueDispatch.php (2)
src/support/src/Bus/AbstractPendingDispatch.php (2)
  • AbstractPendingDispatch (16-39)
  • dispatch (38-38)
src/support/src/Functions.php (1)
  • dispatch (33-45)
src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (6)
src/support/src/Bus/AbstractPendingDispatch.php (2)
  • AbstractPendingDispatch (16-39)
  • dispatch (38-38)
src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (1)
  • dispatch (82-90)
src/support/src/Bus/PendingAsyncQueueDispatch.php (1)
  • dispatch (46-52)
src/support/src/Functions.php (1)
  • dispatch (33-45)
src/helpers/src/Functions.php (1)
  • dispatch (197-224)
src/facade/src/Kafka.php (1)
  • sendBatch (40-53)
src/support/src/Bus/PendingAmqpProducerMessageDispatch.php (4)
src/support/src/Bus/AbstractPendingDispatch.php (2)
  • AbstractPendingDispatch (16-39)
  • dispatch (38-38)
src/support/src/Bus/PendingAsyncQueueDispatch.php (1)
  • dispatch (46-52)
src/support/src/Bus/PendingKafkaProducerMessageDispatch.php (1)
  • dispatch (57-63)
src/support/src/Functions.php (1)
  • dispatch (33-45)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
  • GitHub Check: Test on PHP 8.3 with Swoole 6.0.2
  • GitHub Check: Test on PHP 8.2 with Swoole 6.1.2
  • GitHub Check: Test on PHP 8.3 with Swoole 5.1.8
  • GitHub Check: Test on PHP 8.1 with Swoole 6.1.2
  • GitHub Check: Test on PHP 8.2 with Swoole 5.1.8
  • GitHub Check: Test on PHP 8.2 with Swoole 6.0.2
  • GitHub Check: Test on PHP 8.1 with Swoole 6.0.2
  • GitHub Check: Test on PHP 8.1 with Swoole 5.1.8
  • GitHub Check: Test on PHP 8.3 with Swoole 6.1.2
  • GitHub Check: Test on PHP 8.2 with Swoole 6.1.2
  • GitHub Check: Test on PHP 8.3 with Swoole 6.0.2
  • GitHub Check: Test on PHP 8.3 with Swoole 6.1.2
  • GitHub Check: Test on PHP 8.1 with Swoole 5.1.8
  • GitHub Check: Test on PHP 8.3 with Swoole 5.1.8
  • GitHub Check: Test on PHP 8.1 with Swoole 6.1.2
  • GitHub Check: Test on PHP 8.2 with Swoole 5.1.8
🔇 Additional comments (1)
src/support/src/Bus/AbstractPendingDispatch.php (1)

16-38: 需要进一步验证 defer() 函数的来源。让我检查文件头部的 use 语句和 composer 依赖。

现在检查文件头部的导入情况和命名空间:

验证完成:defer() 函数来源确认,代码无问题

经过验证,defer() 是 Hyperf 框架提供的全局函数,用于在协程结束时执行延迟代码,代码中第 33 行的使用是正确的。该抽象类的生命周期设计清晰、逻辑简洁,当前实现不存在功能缺陷。

建议(可选):

  • 如果要严格保护生命周期语义,可将 __destruct() 标记为 final
  • 可在 abstract protected function dispatch(); 上添加显式返回类型 : void

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d1801c9 and 3a50bf8.

📒 Files selected for processing (1)
  • src/sentry/composer.json (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: Test on PHP 8.1 with Swoole 6.0.2
  • GitHub Check: Test on PHP 8.2 with Swoole 6.0.2
  • GitHub Check: Test on PHP 8.1 with Swoole 6.1.2
  • GitHub Check: Test on PHP 8.2 with Swoole 6.1.2
  • GitHub Check: Test on PHP 8.1 with Swoole 5.1.8
  • GitHub Check: Test on PHP 8.3 with Swoole 6.1.2
  • GitHub Check: Test on PHP 8.2 with Swoole 5.1.8
  • GitHub Check: Test on PHP 8.1 with Swoole 6.0.2
  • GitHub Check: Test on PHP 8.2 with Swoole 6.0.2
  • GitHub Check: Test on PHP 8.3 with Swoole 5.1.8
  • GitHub Check: Test on PHP 8.3 with Swoole 6.0.2
🔇 Additional comments (1)
src/sentry/composer.json (1)

39-47: 确认 hyperf/engine 是否应为可选依赖还是必需依赖。

该条目被添加到 "suggest" 部分作为可选依赖。请确认:

  1. 新的 AbstractPendingDispatch 实现是否真的只在可选情况下需要 hyperf/engine?
  2. 如果使用了延迟调度功能是否一定需要此依赖?

如果没有 hyperf/engine 会导致功能故障,应考虑将其移至 "require" 部分。

"hyperf/amqp": "Required to use the amqp event (~3.1.9).",
"hyperf/crontab": "Required to use the crontab event (~3.1.7).",
"hyperf/database": "Required to use the crontab event (~3.1.0).",
"hyperf/engine": "Required to use the coroutine engine (^2.15.0).",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

❓ Verification inconclusive

验证 hyperf/engine 包版本约束的有效性。

新增的 "hyperf/engine" 可选依赖版本约束为 "^2.15.0"。考虑到此更改与 PR 中引入的 AbstractPendingDispatch 抽象基类的延迟调度功能相关,请确认该版本号是否有效且存在。

建议运行以下命令以验证该依赖的可用性和版本信息:


🏁 Script executed:

#!/bin/bash
# Description: Verify hyperf/engine package existence and latest versions

# Check PackageST/Packagist for hyperf/engine details
curl -s https://repo.packagist.org/p/hyperf/engine.json | jq '.packages["hyperf/engine"] | keys' | head -20

Length of output: 57


版本约束 "^2.15.0" 指向不存在的包版本

hyperf/engine 的最新可用版本是 v2.14.0(2025年4月13日发布)。composer.json 中建议的版本约束 "^2.15.0" 引用的是尚不存在的版本,这将导致 composer 安装时失败。

建议改为 "^2.14.0" 或移除此约束,直到版本 2.15.0 正式发布。

🤖 Prompt for AI Agents
In src/sentry/composer.json around line 44, the version constraint "^2.15.0" for
hyperf/engine references a non-existent release and will break composer
installs; change the constraint to a valid published version (for example
"^2.14.0") or remove the hyperf/engine requirement until v2.15.0 is officially
released, then run composer validate and composer update to confirm the manifest
and dependency resolution succeed.

@huangdijia huangdijia closed this Nov 25, 2025
@huangdijia huangdijia deleted the refactor/abstract-pending-dispatch branch November 25, 2025 23:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants