-
-
Notifications
You must be signed in to change notification settings - Fork 27
feat(tracing): enhance AMQP and Kafka messaging tracing with detailed metadata #896
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
… metadata - Add message ID generation for trace correlation - Include body size metrics for message payloads - Track publish/receive latency timing - Add pool/queue name context to traces - Store carrier payload in context for consumer tracing - Set accurate transaction start timestamps
|
Warning Rate limit exceeded@huangdijia has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 23 minutes and 40 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (6)
Walkthrough在消息发布侧为 AMQP/Kafka/异步队列生成并注入消息元数据(message_id、body_size、destination_name、publish_time 等),在消费侧通过 Hyperf Context 传播并读取载体,设置事务起始时间并将接收端元数据(id、体积、延迟、重试计数、destination.name 等)写入 span/事务。 Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App as 生产方应用
participant Aspect as 发布切面 (AMQP/Kafka/AsyncQueue)
participant Broker as Broker/队列
participant Listener as 消费端 Listener
participant Sentry as Sentry Tracing
App->>Aspect: publish(message)
rect rgba(200,230,255,0.25)
note right of Aspect: 生成 message_id、body_size、destination_name\npack 出载体 {publish_time,message_id,destination_name,body_size}
Aspect->>Broker: publish(message + headers[TRACE_CARRIER])
end
Broker-->>Listener: deliver(message + headers)
rect rgba(220,255,220,0.25)
Listener->>Listener: Context::set(TRACE_CARRIER, ...)\ncontinueTrace(...).setStartTimestamp(microtime(true))
Listener->>Sentry: 在 finish 写入 message.id/body.size/receive.latency/retry.count/destination.name
alt success
Sentry-->>Listener: 结束事务
else error
Sentry-->>Listener: 记录错误并结束
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
Status, Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php (1)
97-108: 修复:$payload 可能为 null 导致偏移访问告警,并避免错误的超大延迟值
- json_decode 返回 null 时,后续
$payload['xxx']会触发 “Trying to access array offset on value of type null”。- 当 publish_time 缺失/无效时,当前实现会产生极大的延迟值(≈当前 Unix 时间),污染指标。
应用如下补丁,确保
$payload始终为数组,并对延迟做健壮性处理;同时优先读取当前协程的 Context,兼容性回退到父协程:- /** @var string|null $carrier */ - $carrier = Context::get(Constants::TRACE_CARRIER, null, Coroutine::parentId()); - $payload = json_decode((string) $carrier, true); + /** @var string|null $carrier */ + $carrier = Context::get(Constants::TRACE_CARRIER) ?? Context::get(Constants::TRACE_CARRIER, null, Coroutine::parentId()); + $payload = json_decode((string) $carrier, true) ?: []; + $receiveLatency = isset($payload['publish_time']) && is_numeric($payload['publish_time']) && $payload['publish_time'] > 0 + ? max(0, microtime(true) - (float) $payload['publish_time']) + : null; @@ - 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), + 'messaging.message.receive.latency' => $receiveLatency,src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php (1)
156-161: 确认并替换 array_last 调用,避免运行时致命错误
Ripgrep 未发现全局或自定义的array_last定义,仅在以下位置被调用,会导致 Fatal error: undefined function
src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php:158src/support/src/Number.php:277src/compoships/src/Database/Eloquent/Relations/HasOneOrMany.php:41建议将所有
array_last调用替换为基于array_key_last的原生实现,示例:- if (array_is_list($data)) { - $carrier = array_last($data); + if (array_is_list($data)) { + $lastKey = array_key_last($data); + $carrier = $lastKey !== null ? ($data[$lastKey] ?? null) : null;src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php (1)
102-102: 批量发送缺少扩展元数据
sendBatchAsync方法中的载体没有包含新增的元数据(publish_time、message_id 等),与单条消息发送的行为不一致。建议为批量发送也添加相应的元数据:
-$carrier = $this->packer->pack($span); +$carrier = $this->packer->pack($span, [ + 'publish_time' => microtime(true), + 'batch_size' => count($messages), +]);
🧹 Nitpick comments (12)
src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php (1)
126-129: 建议:事务结束后清理 TRACE_CARRIER 避免跨消息泄漏在长生命周期协程中,Context 残留可能影响后续事件。建议 finish 后置空该键。
SentrySdk::getCurrentHub()->setSpan($transaction); - $transaction->finish(microtime(true)); + $transaction->finish(microtime(true)); + Context::set(Constants::TRACE_CARRIER, null);src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php (2)
85-88: 提升可读性:避免通过绑定 $this 调用隐藏的 packer当前用法依赖箭头函数绑定将
$this->packer指向 Driver 内部属性,理解成本高。建议显式取出 Driver 的 packer 再计算体积。- $bodySize = (fn ($job) => strlen($this->packer->pack($job)))->call($driver, $job); + // 使用 Driver 自身的 packer 计算体积,避免 $this 变更带来的可读性问题 + $driverPacker = (fn () => $this->packer)->call($driver); + $bodySize = strlen($driverPacker->pack($job));
85-86: 可选:更稳健的 messageId 生成uniqid 并非强随机。若无 getId,可考虑随机字节,失败回退到 uniqid。
- $messageId = method_exists($job, 'getId') ? $job->getId() : uniqid('async_queue_', true); + $messageId = method_exists($job, 'getId') ? $job->getId() : ('async_queue_' . ( + function (): string { + try { return bin2hex(random_bytes(8)); } catch (\Throwable) { return uniqid('', true); } + } + )());src/sentry/src/Tracing/Listener/TracingKafkaListener.php (2)
132-139: 建议:事务结束后清理 TRACE_CARRIER与 AsyncQueue 一致,避免跨消息 Context 残留。
$transaction->setOrigin('auto.kafka') ->setData($data) ->setTags($tags); SentrySdk::getCurrentHub()->setSpan($transaction); - $transaction->finish(microtime(true)); + $transaction->finish(microtime(true)); + Context::set(Constants::TRACE_CARRIER, null);
79-85: 一致性建议:op 字段建议使用稳定值当前
op拼入 topic 名,维度过多且不利于聚合。建议统一为kafka.process(名称仍可含 topic)。src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php (2)
65-67: 考虑使用更强的消息ID生成策略当前使用
uniqid('amqp_', true)生成消息ID。虽然在单机环境下足够,但在分布式系统中可能存在冲突风险。建议使用 UUID 或包含更多熵的方法:
-$messageId = uniqid('amqp_', true); +$messageId = 'amqp_' . bin2hex(random_bytes(16));或者使用 UUID 库(如果项目中已引入):
use Ramsey\Uuid\Uuid; // ... $messageId = 'amqp_' . Uuid::uuid4()->toString();
81-85: 优化载体数据结构的一致性载体中使用
queue_name字段存储$poolName,这可能导致语义混淆。池名称和队列名称在 AMQP 上下文中是不同的概念。建议使用更准确的字段名:
$carrier = $this->packer->pack($span, ['publish_time' => microtime(true), 'message_id' => $messageId, - 'queue_name' => $poolName, + 'pool_name' => $poolName, 'body_size' => $bodySize, ]);相应地,需要更新
TracingAmqpListener.php中的消费端代码。src/sentry/src/Tracing/Listener/TracingAmqpListener.php (2)
112-112: 接收延迟计算可能出现负值当发布端和消费端时钟不同步时,
microtime(true) - ($payload['publish_time'] ?? 0)可能产生负值。建议添加保护逻辑确保延迟为非负值:
-'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), +'messaging.message.receive.latency' => max(0, microtime(true) - ($payload['publish_time'] ?? 0)),
110-114: 字段名称不一致问题如前所述,如果
AmqpProducerAspect中将字段名改为pool_name,这里也需要相应调整。'messaging.message.id' => $payload['message_id'] ?? null, 'messaging.message.body.size' => $payload['body_size'] ?? null, 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), 'messaging.message.retry.count' => 0, -'messaging.destination.name' => $payload['queue_name'] ?? null, +'messaging.destination.name' => $payload['pool_name'] ?? null,src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php (3)
61-61: 与 AMQP 保持一致的消息ID生成策略与
AmqpProducerAspect类似,建议使用更强的ID生成策略。-$messageId = uniqid('kafka_', true); +$messageId = 'kafka_' . bin2hex(random_bytes(16));
70-71: 目标名称字段的不一致代码中有被注释掉的行显示考虑使用
$poolName作为messaging.destination.name,但实际使用的是 topic。这种不一致可能导致混淆。建议明确使用策略并移除注释代码:
'messaging.destination.name' => $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown', -// 'messaging.destination.name' => $poolName, +'messaging.kafka.pool.name' => $poolName,这样既保留了 topic 作为目标名称(符合 Kafka 语义),又额外记录了池名称。
74-79: 载体数据中的语义不一致使用
queue_name存储 Kafka 的$poolName在语义上不准确,Kafka 使用 topic 而非 queue。建议调整字段名以更好地反映实际含义:
$carrier = $this->packer->pack($span, [ 'publish_time' => microtime(true), 'message_id' => $messageId, - 'queue_name' => $poolName, + 'topic_name' => $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown', + 'pool_name' => $poolName, 'body_size' => $bodySize, ]);
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php(1 hunks)src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php(1 hunks)src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php(2 hunks)src/sentry/src/Tracing/Listener/TracingAmqpListener.php(4 hunks)src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php(1 hunks)src/sentry/src/Tracing/Listener/TracingKafkaListener.php(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php (1)
src/sentry/src/Util/CarrierPacker.php (1)
pack(37-45)
src/sentry/src/Tracing/Listener/TracingAmqpListener.php (1)
src/sentry/src/Constants.php (1)
Constants(14-21)
src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php (1)
src/sentry/src/Util/CarrierPacker.php (1)
pack(37-45)
src/sentry/src/Tracing/Listener/TracingKafkaListener.php (1)
src/sentry/src/Constants.php (1)
Constants(14-21)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: Test on PHP 8.1 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.2 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.1 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.3 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.2 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.3 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.1 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.1 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.2 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.2 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.3 with Swoole 5.1.7
🔇 Additional comments (5)
src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php (1)
101-108: 新增 messaging.operation=process 合理且一致与发布侧保持一致,消费侧补充 operation 有助于统一检索与聚合。LGTM。
src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php (1)
89-95: 新增 messaging.operation=publish 合理且一致发布侧打点补齐 operation 字段,便于查询聚合。LGTM。
src/sentry/src/Tracing/Listener/TracingKafkaListener.php (2)
73-74: 在 Context 中持久化 TRACE_CARRIER 的做法合理Kafka 头读取后写入 Context,后续 finish 阶段可统一解包。LGTM。
82-87: 显式设置事务起始时间continueTrace 后设置 startTimestamp,有助于更准确的端到端延迟计算。LGTM。
src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php (1)
62-62: 确认 Hyperf\Kafka\Producer 的$name动态属性是否始终可用
闭包访问$this->name可能在实例上找不到该属性而导致运行时错误,请在 Vendor 源码或官方文档中验证该属性定义或对应的__get实现,并确保在所有使用场景下都可用。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php (1)
72-76: 防御式校验 $job,避免 $job 为 null 时的致命错误$job 默认为 null,但紧接着使用 $job::class。若 AOP 入参键名变化或异常场景导致缺失,会触发致命错误。建议在取到的值不是对象时直接放行。
- $job = $proceedingJoinPoint->arguments['keys']['job'] ?? null; - $span = $this->startSpan( - 'queue.publish', - $job::class - ); + $job = $proceedingJoinPoint->arguments['keys']['job'] ?? null; + if (! is_object($job)) { + return $proceedingJoinPoint->process(); + } + $span = $this->startSpan( + 'queue.publish', + $job::class + );
🧹 Nitpick comments (2)
src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php (2)
112-112: origin 建议更具体:auto.async_queue当前为 auto.queue。为与其它系统(amqp/kafka/async_queue)区分,建议使用 auto.async_queue,便于基于 origin 的检索与告警路由。
- $span->setOrigin('auto.queue')->finish(); + $span->setOrigin('auto.async_queue')->finish();
137-147: __serialize/__unserialize 分支实现对称,需补充测试分支逻辑已对齐:list 分支通过末尾追加,再由 __unserialize 取最后一项;关联数组分支在含 job 键时分别写入和读取 Constants::TRACE_CARRIER。建议新增单测,分别模拟返回 list 和关联数组场景,校验 TRACE_CARRIER 在序列化和反序列化流程中都能正确传递。
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (6)
src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php(1 hunks)src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php(3 hunks)src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php(2 hunks)src/sentry/src/Tracing/Listener/TracingAmqpListener.php(4 hunks)src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php(1 hunks)src/sentry/src/Tracing/Listener/TracingKafkaListener.php(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php
- src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php
- src/sentry/src/Tracing/Listener/TracingAmqpListener.php
- src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php
- src/sentry/src/Tracing/Listener/TracingKafkaListener.php
🧰 Additional context used
🧬 Code graph analysis (1)
src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php (1)
src/sentry/src/Util/CarrierPacker.php (1)
pack(37-45)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Test on PHP 8.2 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.2 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.1 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.3 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.3 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.1 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.2 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.1 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.3 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.2 with Swoole 6.0.2
- GitHub Check: Test on PHP 8.1 with Swoole 5.1.7
- GitHub Check: Test on PHP 8.3 with Swoole 5.1.7
🔇 Additional comments (2)
src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php (2)
90-90: messaging.operation = publish 对齐规范,LGTM
- 与消费端的 process 相呼应,便于筛选与统计。无需改动。
65-65: 建议:移除兼容旧键的冗余逻辑仓库中未发现对
sentry.async_queue.name的任何引用(仅存在两处对新键sentry.messaging.destination.name的读写),说明旧键在本项目内并无使用场景;除非有外部组件依赖,否则可省略回退读取和双写旧键的改动。
… metadata (#896) * feat(tracing): enhance AMQP and Kafka messaging tracing with detailed metadata - Add message ID generation for trace correlation - Include body size metrics for message payloads - Track publish/receive latency timing - Add pool/queue name context to traces - Store carrier payload in context for consumer tracing - Set accurate transaction start timestamps * fix(tracing): standardize messaging operation and destination name in tracing aspects * fix(tracing): update destination name references in messaging aspects for consistency * fix(tracing): handle missing publish time in message receive latency calculation * fix(tracing): include publish time in AMQP message packing for improved tracing * fix(tracing): include publish time and message metadata in Kafka batch message packing * fix(tracing): streamline message packing in sendBatchAsync for improved performance * fix(tracing): rename variable for clarity in handleGet method --------- Co-authored-by: Deeka Wong <8337659+huangdijia@users.noreply.github.com>
Summary
Enhances AMQP and Kafka messaging tracing with comprehensive metadata for better observability:
Changes
AmqpProducerAspect.php: Added message ID, body size, pool name, and enhanced carrier dataKafkaProducerAspect.php: Added message ID, body size, and enhanced carrier dataTracingAmqpListener.php: Enhanced consumer tracing with carrier payload data and latency metricsTracingKafkaListener.php: Enhanced consumer tracing with carrier payload data and latency metricsBenefits
Summary by CodeRabbit