From c3d7dd09e1e7ee453a7062e26e02cf583e1c813c Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 17:38:09 +0800 Subject: [PATCH 1/8] feat(tracing): enhance AMQP and Kafka messaging tracing with detailed metadata - Add message ID generation for trace correlation - Include body size metrics for message payloads - Track publish/receive latency timing - Add pool/queue name context to traces - Store carrier payload in context for consumer tracing - Set accurate transaction start timestamps --- .../src/Tracing/Aspect/AmqpProducerAspect.php | 18 ++++++++++++++++-- .../src/Tracing/Aspect/KafkaProducerAspect.php | 15 ++++++++++++++- .../Tracing/Listener/TracingAmqpListener.php | 15 ++++++++++++++- .../Tracing/Listener/TracingKafkaListener.php | 15 ++++++++++++++- 4 files changed, 58 insertions(+), 5 deletions(-) diff --git a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php index fb1a04595..8cb861a79 100644 --- a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php @@ -57,19 +57,33 @@ protected function produceMessage(ProceedingJoinPoint $proceedingJoinPoint) 'topic.send', sprintf('%s::%s()', $proceedingJoinPoint->className, $proceedingJoinPoint->methodName) ); + if (! $span) { return $proceedingJoinPoint->process(); } + + $messageId = uniqid('amqp_', true); + $poolName = $producerMessage->getPoolName() ?: 'default'; + $bodySize = strlen($producerMessage->payload()); $span->setData([ 'messaging.system' => 'amqp', + 'messaging.message.id' => $messageId, + 'messaging.destination.name' => $poolName, + 'messaging.message.body.size' => $bodySize, 'messaging.operation' => 'publish', + // for amqp 'messaging.amqp.message.type' => $producerMessage->getTypeString(), 'messaging.amqp.message.routing_key' => $producerMessage->getRoutingKey(), 'messaging.amqp.message.exchange' => $producerMessage->getExchange(), - 'messaging.amqp.message.pool_name' => $producerMessage->getPoolName(), + 'messaging.amqp.message.pool_name' => $poolName, + ]); + + $carrier = $this->packer->pack($span, ['publish_time' => microtime(true), + 'message_id' => $messageId, + 'queue_name' => $poolName, + 'body_size' => $bodySize, ]); - $carrier = $this->packer->pack($span); (function () use ($carrier) { $this->properties['application_headers'] ??= new AMQPTable(); $this->properties['application_headers']->set(Constants::TRACE_CARRIER, $carrier); diff --git a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php index 41a544c8f..8d0504182 100644 --- a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php @@ -23,6 +23,7 @@ /** * @property array $headers + * @property string $name */ class KafkaProducerAspect extends AbstractAspect { @@ -57,13 +58,25 @@ protected function sendAsync(ProceedingJoinPoint $proceedingJoinPoint) return $proceedingJoinPoint->process(); } + $messageId = uniqid('kafka_', true); + $poolName = (fn () => $this->name)->call($proceedingJoinPoint->getInstance()); + $bodySize = strlen($proceedingJoinPoint->arguments['keys']['value'] ?? ''); + $span->setData([ 'messaging.system' => 'kafka', + 'messaging.message.id' => $messageId, + // 'messaging.destination.name' => $poolName, + 'messaging.message.body.size' => $bodySize, 'messaging.operation' => 'publish', 'messaging.destination.name' => $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown', ]); - $carrier = $this->packer->pack($span); + $carrier = $this->packer->pack($span, [ + 'publish_time' => microtime(true), + 'message_id' => $messageId, + 'queue_name' => $poolName, + 'body_size' => $bodySize, + ]); $headers = $proceedingJoinPoint->arguments['keys']['headers'] ?? []; $headers[] = (new RecordHeader()) ->setHeaderKey(Constants::TRACE_CARRIER) diff --git a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php index b2ff1aa9f..b102d1388 100644 --- a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php @@ -19,6 +19,7 @@ use Hyperf\Amqp\Event\BeforeConsume; use Hyperf\Amqp\Event\FailToConsume; use Hyperf\Amqp\Message\ConsumerMessage; +use Hyperf\Context\Context; use Hyperf\Event\Contract\ListenerInterface; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -73,6 +74,7 @@ protected function startTransaction(BeforeConsume $event): void $applicationHeaders = $amqpMessage->has('application_headers') ? $amqpMessage->get('application_headers') : null; if ($applicationHeaders && isset($applicationHeaders[Constants::TRACE_CARRIER])) { [$sentryTrace, $baggage] = $this->packer->unpack($applicationHeaders[Constants::TRACE_CARRIER]); + Context::set(Constants::TRACE_CARRIER, $applicationHeaders[Constants::TRACE_CARRIER]); } } @@ -83,7 +85,7 @@ protected function startTransaction(BeforeConsume $event): void op: 'topic.process', description: $message::class, source: TransactionSource::custom() - ); + )->setStartTimestamp(microtime(true)); } protected function finishTransaction(AfterConsume|FailToConsume $event): void @@ -95,11 +97,22 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void return; } + $payload = []; + if ($carrier = Context::get(Constants::TRACE_CARRIER)) { + $payload = json_decode((string) $carrier, true); + } + /** @var ConsumerMessage $message */ $message = $event->getMessage(); $data = [ 'messaging.system' => 'amqp', + 'messaging.message.id' => $payload['message_id'] ?? null, + 'messaging.destination.name' => $payload['queue_name'] ?? null, + 'messaging.message.body.size' => $payload['body_size'] ?? null, + 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), + 'messaging.message.retry.count' => 0, 'messaging.operation' => 'process', + // for amqp 'messaging.amqp.message.type' => $message->getTypeString(), 'messaging.amqp.message.routing_key' => $message->getRoutingKey(), 'messaging.amqp.message.exchange' => $message->getExchange(), diff --git a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php index e7ad48118..8541d9180 100644 --- a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php +++ b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php @@ -15,6 +15,7 @@ use FriendsOfHyperf\Sentry\Switcher; use FriendsOfHyperf\Sentry\Tracing\SpanStarter; use FriendsOfHyperf\Sentry\Util\CarrierPacker; +use Hyperf\Context\Context; use Hyperf\Event\Contract\ListenerInterface; use Hyperf\Kafka\Event\AfterConsume; use Hyperf\Kafka\Event\BeforeConsume; @@ -69,6 +70,7 @@ protected function startTransaction(BeforeConsume $event): void foreach ($message->getHeaders() as $header) { if ($header->getHeaderKey() === Constants::TRACE_CARRIER) { [$sentryTrace, $baggage] = $this->packer->unpack($header->getValue()); + Context::set(Constants::TRACE_CARRIER, $header->getValue()); break; } } @@ -81,7 +83,7 @@ protected function startTransaction(BeforeConsume $event): void op: $consumer->getTopic() . ' process', description: $consumer::class, source: TransactionSource::custom() - ); + )->setStartTimestamp(microtime(true)); } protected function finishTransaction(AfterConsume|FailToConsume $event): void @@ -93,12 +95,23 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void return; } + $payload = []; + if ($carrier = (string) Context::get(Constants::TRACE_CARRIER)) { + $payload = json_decode($carrier, true); + } + $consumer = $event->getConsumer(); $tags = []; $data = [ 'messaging.system' => 'kafka', 'messaging.operation' => 'process', 'messaging.destination.name' => $consumer->getTopic(), + 'messaging.message.id' => $payload['message_id'] ?? null, + // 'messaging.destination.name' => $payload['queue_name'] ?? null, + 'messaging.message.body.size' => $payload['body_size'] ?? null, + 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), + 'messaging.message.retry.count' => 0, + // for kafka 'messaging.kafka.consumer.group' => $consumer->getGroupId(), 'messaging.kafka.consumer.pool' => $consumer->getPool(), ]; From d51cdfb41c108c80189a28e2e49b2f7d26b572a0 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 17:41:18 +0800 Subject: [PATCH 2/8] fix(tracing): standardize messaging operation and destination name in tracing aspects --- src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php | 4 ++-- src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php | 3 ++- src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php | 4 ++-- src/sentry/src/Tracing/Listener/TracingAmqpListener.php | 4 ++-- src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php | 3 ++- src/sentry/src/Tracing/Listener/TracingKafkaListener.php | 4 ++-- 6 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php index 8cb861a79..53c8879a1 100644 --- a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php @@ -67,10 +67,10 @@ protected function produceMessage(ProceedingJoinPoint $proceedingJoinPoint) $bodySize = strlen($producerMessage->payload()); $span->setData([ 'messaging.system' => 'amqp', + 'messaging.operation' => 'publish', 'messaging.message.id' => $messageId, - 'messaging.destination.name' => $poolName, 'messaging.message.body.size' => $bodySize, - 'messaging.operation' => 'publish', + 'messaging.destination.name' => $poolName, // for amqp 'messaging.amqp.message.type' => $producerMessage->getTypeString(), 'messaging.amqp.message.routing_key' => $producerMessage->getRoutingKey(), diff --git a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php index c84d9577d..1ce96f837 100644 --- a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php +++ b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php @@ -87,9 +87,10 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint) $bodySize = (fn ($job) => strlen($this->packer->pack($job)))->call($driver, $job); $data = [ 'messaging.system' => 'async_queue', + 'messaging.operation' => 'publish', 'messaging.message.id' => $messageId, - 'messaging.destination.name' => $queueName, 'messaging.message.body.size' => $bodySize, + 'messaging.destination.name' => $queueName, ]; $data += match (true) { $driver instanceof RedisDriver => $this->buildSpanDataOfRedisDriver($driver), diff --git a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php index 8d0504182..6c8c105c0 100644 --- a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php @@ -64,11 +64,11 @@ protected function sendAsync(ProceedingJoinPoint $proceedingJoinPoint) $span->setData([ 'messaging.system' => 'kafka', + 'messaging.operation' => 'publish', 'messaging.message.id' => $messageId, - // 'messaging.destination.name' => $poolName, 'messaging.message.body.size' => $bodySize, - 'messaging.operation' => 'publish', 'messaging.destination.name' => $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown', + // 'messaging.destination.name' => $poolName, ]); $carrier = $this->packer->pack($span, [ diff --git a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php index b102d1388..a5a20f608 100644 --- a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php @@ -106,12 +106,12 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void $message = $event->getMessage(); $data = [ 'messaging.system' => 'amqp', + 'messaging.operation' => 'process', 'messaging.message.id' => $payload['message_id'] ?? null, - 'messaging.destination.name' => $payload['queue_name'] ?? null, 'messaging.message.body.size' => $payload['body_size'] ?? null, 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), 'messaging.message.retry.count' => 0, - 'messaging.operation' => 'process', + 'messaging.destination.name' => $payload['queue_name'] ?? null, // for amqp 'messaging.amqp.message.type' => $message->getTypeString(), 'messaging.amqp.message.routing_key' => $message->getRoutingKey(), diff --git a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php index 3c573e3d1..6bc6bde03 100644 --- a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php @@ -99,11 +99,12 @@ protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event $payload = json_decode((string) $carrier, true); $data = [ 'messaging.system' => 'async_queue', + 'messaging.operation' => 'process', 'messaging.message.id' => $payload['message_id'] ?? null, - 'messaging.destination.name' => $payload['queue_name'] ?? null, 'messaging.message.body.size' => $payload['body_size'] ?? null, 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), 'messaging.message.retry.count' => $event->getMessage()->getAttempts(), + 'messaging.destination.name' => $payload['queue_name'] ?? null, ]; $tags = []; diff --git a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php index 8541d9180..a2a803028 100644 --- a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php +++ b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php @@ -105,12 +105,12 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void $data = [ 'messaging.system' => 'kafka', 'messaging.operation' => 'process', - 'messaging.destination.name' => $consumer->getTopic(), 'messaging.message.id' => $payload['message_id'] ?? null, - // 'messaging.destination.name' => $payload['queue_name'] ?? null, 'messaging.message.body.size' => $payload['body_size'] ?? null, 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), 'messaging.message.retry.count' => 0, + 'messaging.destination.name' => $consumer->getTopic(), + // 'messaging.destination.name' => $payload['queue_name'] ?? null, // for kafka 'messaging.kafka.consumer.group' => $consumer->getGroupId(), 'messaging.kafka.consumer.pool' => $consumer->getPool(), From c939c3fbb34347ec9fa30b30f553766d94694382 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 18:01:21 +0800 Subject: [PATCH 3/8] fix(tracing): update destination name references in messaging aspects for consistency --- src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php | 8 ++++---- .../src/Tracing/Aspect/AsyncQueueJobMessageAspect.php | 8 ++++---- src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php | 7 +++---- src/sentry/src/Tracing/Listener/TracingAmqpListener.php | 2 +- .../src/Tracing/Listener/TracingAsyncQueueListener.php | 2 +- src/sentry/src/Tracing/Listener/TracingKafkaListener.php | 1 - 6 files changed, 13 insertions(+), 15 deletions(-) diff --git a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php index 53c8879a1..b7c4b55ac 100644 --- a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php @@ -63,24 +63,24 @@ protected function produceMessage(ProceedingJoinPoint $proceedingJoinPoint) } $messageId = uniqid('amqp_', true); - $poolName = $producerMessage->getPoolName() ?: 'default'; + $destinationName = $producerMessage->getExchange(); $bodySize = strlen($producerMessage->payload()); $span->setData([ 'messaging.system' => 'amqp', 'messaging.operation' => 'publish', 'messaging.message.id' => $messageId, 'messaging.message.body.size' => $bodySize, - 'messaging.destination.name' => $poolName, + 'messaging.destination.name' => $destinationName, // for amqp 'messaging.amqp.message.type' => $producerMessage->getTypeString(), 'messaging.amqp.message.routing_key' => $producerMessage->getRoutingKey(), 'messaging.amqp.message.exchange' => $producerMessage->getExchange(), - 'messaging.amqp.message.pool_name' => $poolName, + 'messaging.amqp.message.pool_name' => $producerMessage->getPoolName(), ]); $carrier = $this->packer->pack($span, ['publish_time' => microtime(true), 'message_id' => $messageId, - 'queue_name' => $poolName, + 'destination_name' => $destinationName, 'body_size' => $bodySize, ]); diff --git a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php index 1ce96f837..ca68ae59f 100644 --- a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php +++ b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php @@ -62,7 +62,7 @@ public function process(ProceedingJoinPoint $proceedingJoinPoint) public function handleGet(ProceedingJoinPoint $proceedingJoinPoint) { $queue = $proceedingJoinPoint->arguments['keys']['name'] ?? 'default'; - Context::set('sentry.async_queue.name', $queue); + Context::set('sentry.messaging.destination.name', $queue); return $proceedingJoinPoint->process(); } @@ -83,14 +83,14 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint) /** @var \Hyperf\AsyncQueue\Driver\Driver $driver */ $driver = $proceedingJoinPoint->getInstance(); $messageId = method_exists($job, 'getId') ? $job->getId() : uniqid('async_queue_', true); - $queueName = Context::get('sentry.async_queue.name', 'default'); + $destinationName = Context::get('sentry.messaging.destination.name', 'default'); $bodySize = (fn ($job) => strlen($this->packer->pack($job)))->call($driver, $job); $data = [ 'messaging.system' => 'async_queue', 'messaging.operation' => 'publish', 'messaging.message.id' => $messageId, 'messaging.message.body.size' => $bodySize, - 'messaging.destination.name' => $queueName, + 'messaging.destination.name' => $destinationName, ]; $data += match (true) { $driver instanceof RedisDriver => $this->buildSpanDataOfRedisDriver($driver), @@ -101,7 +101,7 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint) $carrier = $this->packer->pack($span, [ 'publish_time' => microtime(true), 'message_id' => $messageId, - 'queue_name' => $queueName, + 'destination_name' => $destinationName, 'body_size' => $bodySize, ]); diff --git a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php index 6c8c105c0..ac4d5aaa4 100644 --- a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php @@ -59,7 +59,7 @@ protected function sendAsync(ProceedingJoinPoint $proceedingJoinPoint) } $messageId = uniqid('kafka_', true); - $poolName = (fn () => $this->name)->call($proceedingJoinPoint->getInstance()); + $destinationName = $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown'; $bodySize = strlen($proceedingJoinPoint->arguments['keys']['value'] ?? ''); $span->setData([ @@ -67,14 +67,13 @@ protected function sendAsync(ProceedingJoinPoint $proceedingJoinPoint) 'messaging.operation' => 'publish', 'messaging.message.id' => $messageId, 'messaging.message.body.size' => $bodySize, - 'messaging.destination.name' => $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown', - // 'messaging.destination.name' => $poolName, + 'messaging.destination.name' => $destinationName, ]); $carrier = $this->packer->pack($span, [ 'publish_time' => microtime(true), 'message_id' => $messageId, - 'queue_name' => $poolName, + 'destination_name' => $destinationName, 'body_size' => $bodySize, ]); $headers = $proceedingJoinPoint->arguments['keys']['headers'] ?? []; diff --git a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php index a5a20f608..b97b6ea01 100644 --- a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php @@ -111,7 +111,7 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void 'messaging.message.body.size' => $payload['body_size'] ?? null, 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), 'messaging.message.retry.count' => 0, - 'messaging.destination.name' => $payload['queue_name'] ?? null, + 'messaging.destination.name' => $payload['destination_name'] ?? null, // for amqp 'messaging.amqp.message.type' => $message->getTypeString(), 'messaging.amqp.message.routing_key' => $message->getRoutingKey(), diff --git a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php index 6bc6bde03..f2ac35861 100644 --- a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php @@ -104,7 +104,7 @@ protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event 'messaging.message.body.size' => $payload['body_size'] ?? null, 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), 'messaging.message.retry.count' => $event->getMessage()->getAttempts(), - 'messaging.destination.name' => $payload['queue_name'] ?? null, + 'messaging.destination.name' => $payload['destination_name'] ?? null, ]; $tags = []; diff --git a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php index a2a803028..47cc2b3ab 100644 --- a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php +++ b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php @@ -110,7 +110,6 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), 'messaging.message.retry.count' => 0, 'messaging.destination.name' => $consumer->getTopic(), - // 'messaging.destination.name' => $payload['queue_name'] ?? null, // for kafka 'messaging.kafka.consumer.group' => $consumer->getGroupId(), 'messaging.kafka.consumer.pool' => $consumer->getPool(), From dd6a8b4b94f276672d599a57f7432e97f7078fd8 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 18:06:54 +0800 Subject: [PATCH 4/8] fix(tracing): handle missing publish time in message receive latency calculation --- src/sentry/src/Tracing/Listener/TracingAmqpListener.php | 2 +- src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php | 2 +- src/sentry/src/Tracing/Listener/TracingKafkaListener.php | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php index b97b6ea01..4e2b8d6bb 100644 --- a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php @@ -109,7 +109,7 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void 'messaging.operation' => 'process', 'messaging.message.id' => $payload['message_id'] ?? null, 'messaging.message.body.size' => $payload['body_size'] ?? null, - 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), + 'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null, 'messaging.message.retry.count' => 0, 'messaging.destination.name' => $payload['destination_name'] ?? null, // for amqp diff --git a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php index f2ac35861..2f0af2bff 100644 --- a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php @@ -102,7 +102,7 @@ protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event 'messaging.operation' => 'process', 'messaging.message.id' => $payload['message_id'] ?? null, 'messaging.message.body.size' => $payload['body_size'] ?? null, - 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), + 'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null, 'messaging.message.retry.count' => $event->getMessage()->getAttempts(), 'messaging.destination.name' => $payload['destination_name'] ?? null, ]; diff --git a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php index 47cc2b3ab..baa280719 100644 --- a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php +++ b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php @@ -107,7 +107,7 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void 'messaging.operation' => 'process', 'messaging.message.id' => $payload['message_id'] ?? null, 'messaging.message.body.size' => $payload['body_size'] ?? null, - 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), + 'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null, 'messaging.message.retry.count' => 0, 'messaging.destination.name' => $consumer->getTopic(), // for kafka From bc5c66ea91af5ed1c2b636c6ab3e3720a0fa6f4e Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 18:07:33 +0800 Subject: [PATCH 5/8] fix(tracing): include publish time in AMQP message packing for improved tracing --- src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php index b7c4b55ac..ffca5d667 100644 --- a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php @@ -78,7 +78,8 @@ protected function produceMessage(ProceedingJoinPoint $proceedingJoinPoint) 'messaging.amqp.message.pool_name' => $producerMessage->getPoolName(), ]); - $carrier = $this->packer->pack($span, ['publish_time' => microtime(true), + $carrier = $this->packer->pack($span, [ + 'publish_time' => microtime(true), 'message_id' => $messageId, 'destination_name' => $destinationName, 'body_size' => $bodySize, From 467ec2dcd51333019eaaddd259b3c5b10b6a21d5 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 18:13:11 +0800 Subject: [PATCH 6/8] fix(tracing): include publish time and message metadata in Kafka batch message packing --- .../src/Tracing/Aspect/KafkaProducerAspect.php | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php index ac4d5aaa4..da7435008 100644 --- a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php @@ -24,6 +24,7 @@ /** * @property array $headers * @property string $name + * @mixin ProduceMessage */ class KafkaProducerAspect extends AbstractAspect { @@ -98,13 +99,19 @@ protected function sendBatchAsync(ProceedingJoinPoint $proceedingJoinPoint) return $proceedingJoinPoint->process(); } - $carrier = $this->packer->pack($span); + $packer = $this->packer; foreach ($messages as $message) { ( - fn () => $this->headers[] = (new RecordHeader()) - ->setHeaderKey(Constants::TRACE_CARRIER) - ->setValue($carrier) + function () use ($span, $packer) { + $carrier = $packer->pack($span, [ + 'publish_time' => microtime(true), + 'message_id' => uniqid('kafka_', true), + 'destination_name' => $this->getTopic(), + 'body_size' => strlen((string) $this->getValue()), + ]); + $this->headers[] = (new RecordHeader())->setHeaderKey(Constants::TRACE_CARRIER)->setValue($carrier); + } )->call($message); } From 0dfc50bcba90feb7af614e9b63687c7014c9a81d Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 18:14:03 +0800 Subject: [PATCH 7/8] fix(tracing): streamline message packing in sendBatchAsync for improved performance --- .../Tracing/Aspect/KafkaProducerAspect.php | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php index da7435008..3e7e1f42a 100644 --- a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php @@ -102,17 +102,15 @@ protected function sendBatchAsync(ProceedingJoinPoint $proceedingJoinPoint) $packer = $this->packer; foreach ($messages as $message) { - ( - function () use ($span, $packer) { - $carrier = $packer->pack($span, [ - 'publish_time' => microtime(true), - 'message_id' => uniqid('kafka_', true), - 'destination_name' => $this->getTopic(), - 'body_size' => strlen((string) $this->getValue()), - ]); - $this->headers[] = (new RecordHeader())->setHeaderKey(Constants::TRACE_CARRIER)->setValue($carrier); - } - )->call($message); + (function () use ($span, $packer) { + $carrier = $packer->pack($span, [ + 'publish_time' => microtime(true), + 'message_id' => uniqid('kafka_', true), + 'destination_name' => $this->getTopic(), + 'body_size' => strlen((string) $this->getValue()), + ]); + $this->headers[] = (new RecordHeader())->setHeaderKey(Constants::TRACE_CARRIER)->setValue($carrier); + })->call($message); } return tap($proceedingJoinPoint->process(), fn () => $span->setOrigin('auto.kafka')->finish()); From 37aef69cc9b173dda3e6f55e1e7c193d8ad56627 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Wed, 3 Sep 2025 18:15:24 +0800 Subject: [PATCH 8/8] fix(tracing): rename variable for clarity in handleGet method --- src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php index ca68ae59f..69d3836b9 100644 --- a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php +++ b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php @@ -61,8 +61,8 @@ public function process(ProceedingJoinPoint $proceedingJoinPoint) public function handleGet(ProceedingJoinPoint $proceedingJoinPoint) { - $queue = $proceedingJoinPoint->arguments['keys']['name'] ?? 'default'; - Context::set('sentry.messaging.destination.name', $queue); + $destinationName = $proceedingJoinPoint->arguments['keys']['name'] ?? 'default'; + Context::set('sentry.messaging.destination.name', $destinationName); return $proceedingJoinPoint->process(); }