From 50f7c3374fab823d32bf67a36a29ecb7bdce28c9 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 14:21:27 +0800 Subject: [PATCH 1/2] feat: improve async queue tracing with detailed messaging metrics - Add queue name tracking through driver factory aspect interception - Include message ID, queue name, and body size in tracing spans - Track message latency between publish and process - Enhance carrier packing to support extra metadata - Remove unused Throwable import - Set proper start timestamps for queue processing transactions This enhancement provides better observability for async queue operations by capturing comprehensive messaging metrics including latency, message size, and proper queue identification. --- .../Aspect/AsyncQueueJobMessageAspect.php | 27 ++++++++++++++++--- .../Listener/TracingAsyncQueueListener.php | 13 +++++++-- src/sentry/src/Util/CarrierPacker.php | 3 ++- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php index 8b537ca76..d01d4f9f5 100644 --- a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php +++ b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php @@ -19,7 +19,6 @@ use Hyperf\Context\Context; use Hyperf\Di\Aop\AbstractAspect; use Hyperf\Di\Aop\ProceedingJoinPoint; -use Throwable; use function Hyperf\Support\with; @@ -33,6 +32,7 @@ class AsyncQueueJobMessageAspect extends AbstractAspect use SpanStarter; public array $classes = [ + 'Hyperf\AsyncQueue\Driver\DriverFactory::get', 'Hyperf\AsyncQueue\Driver\*Driver::push', 'Hyperf\AsyncQueue\JobMessage::__serialize', 'Hyperf\AsyncQueue\JobMessage::__unserialize', @@ -51,6 +51,7 @@ public function process(ProceedingJoinPoint $proceedingJoinPoint) } return match ($proceedingJoinPoint->methodName) { + 'get' => $this->handleGet($proceedingJoinPoint), 'push' => $this->handlePush($proceedingJoinPoint), '__serialize' => $this->handleSerialize($proceedingJoinPoint), '__unserialize' => $this->handleUnserialize($proceedingJoinPoint), @@ -58,11 +59,18 @@ public function process(ProceedingJoinPoint $proceedingJoinPoint) }; } + public function handleGet(ProceedingJoinPoint $proceedingJoinPoint) + { + $queue = $proceedingJoinPoint->arguments['keys']['name'] ?? 'default'; + Context::set('sentry.async_queue.name', $queue); + } + public function handlePush(ProceedingJoinPoint $proceedingJoinPoint) { + $job = $proceedingJoinPoint->arguments['keys']['job'] ?? null; $span = $this->startSpan( 'queue.publish', - $proceedingJoinPoint->arguments['keys']['job']::class + $job::class ); if (! $span) { @@ -70,8 +78,14 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint) } try { + $messageId = method_exists($job, 'getId') ? $job->getId() : uniqid('async_queue_', true); + $queueName = Context::get('sentry.async_queue.name', 'default'); + $bodySize = (fn ($job) => strlen($this->packer->pack($job)))->call($this, $job); $data = [ 'messaging.system' => 'async_queue', + 'messaging.message.id' => $messageId, + 'messaging.destination.name' => $queueName, + 'messaging.message.body.size' => $bodySize, ]; /** @var \Hyperf\AsyncQueue\Driver\Driver $driver */ @@ -82,11 +96,16 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint) }; $span->setData($data); - $carrier = $this->packer->pack($span); + $carrier = $this->packer->pack($span, [ + 'publish_time' => microtime(true), + 'message_id' => $messageId, + 'queue_name' => $queueName, + 'body_size' => $bodySize, + ]); + Context::set(Constants::TRACE_CARRIER, $carrier); return $proceedingJoinPoint->process(); - } catch (Throwable) { } finally { $span->setOrigin('auto.queue')->finish(); } diff --git a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php index 2ee432aa8..edfe0e984 100644 --- a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php @@ -82,7 +82,7 @@ protected function startTransaction(BeforeHandle $event): void op: 'queue.process', description: 'async_queue: ' . $job::class, source: TransactionSource::custom() - ); + )->setStartTimestamp(microtime(true)); } protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event): void @@ -94,7 +94,16 @@ protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event return; } - $data = []; + /** @var string|null $carrier */ + $carrier = Context::get(Constants::TRACE_CARRIER, null, Coroutine::parentId()); + $payload = json_decode((string) $carrier, true); + $data = [ + 'messaging.message.id' => $payload['message_id'] ?? null, + 'messaging.destination.name' => $payload['queue_name'] ?? null, + 'messaging.message.body.size' => $payload['body_size'] ?? null, + 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), + 'messaging.message.retry.count' => 0, + ]; $tags = []; if (method_exists($event, 'getThrowable') && $exception = $event->getThrowable()) { diff --git a/src/sentry/src/Util/CarrierPacker.php b/src/sentry/src/Util/CarrierPacker.php index 132834b3b..5eb8fbe98 100644 --- a/src/sentry/src/Util/CarrierPacker.php +++ b/src/sentry/src/Util/CarrierPacker.php @@ -34,12 +34,13 @@ public function unpack(string $data): array } } - public function pack(Span $span): string + public function pack(Span $span, array $extra = []): string { return json_encode([ 'sentry-trace' => $span->toTraceparent(), 'baggage' => $span->toBaggage(), 'traceparent' => $span->toW3CTraceparent(), + ...$extra, ]); } } From ca4cc5b967176bb4844685d14a2f491469413942 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 14:21:56 +0800 Subject: [PATCH 2/2] feat: add messaging system metadata to async queue tracing --- src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php index edfe0e984..9fa2bb4aa 100644 --- a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php @@ -98,6 +98,7 @@ protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event $carrier = Context::get(Constants::TRACE_CARRIER, null, Coroutine::parentId()); $payload = json_decode((string) $carrier, true); $data = [ + 'messaging.system' => 'async_queue', 'messaging.message.id' => $payload['message_id'] ?? null, 'messaging.destination.name' => $payload['queue_name'] ?? null, 'messaging.message.body.size' => $payload['body_size'] ?? null,