diff --git a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php index fb1a04595..ffca5d667 100644 --- a/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php @@ -57,19 +57,34 @@ protected function produceMessage(ProceedingJoinPoint $proceedingJoinPoint) 'topic.send', sprintf('%s::%s()', $proceedingJoinPoint->className, $proceedingJoinPoint->methodName) ); + if (! $span) { return $proceedingJoinPoint->process(); } + + $messageId = uniqid('amqp_', true); + $destinationName = $producerMessage->getExchange(); + $bodySize = strlen($producerMessage->payload()); $span->setData([ 'messaging.system' => 'amqp', 'messaging.operation' => 'publish', + 'messaging.message.id' => $messageId, + 'messaging.message.body.size' => $bodySize, + 'messaging.destination.name' => $destinationName, + // for amqp 'messaging.amqp.message.type' => $producerMessage->getTypeString(), 'messaging.amqp.message.routing_key' => $producerMessage->getRoutingKey(), 'messaging.amqp.message.exchange' => $producerMessage->getExchange(), 'messaging.amqp.message.pool_name' => $producerMessage->getPoolName(), ]); - $carrier = $this->packer->pack($span); + $carrier = $this->packer->pack($span, [ + 'publish_time' => microtime(true), + 'message_id' => $messageId, + 'destination_name' => $destinationName, + 'body_size' => $bodySize, + ]); + (function () use ($carrier) { $this->properties['application_headers'] ??= new AMQPTable(); $this->properties['application_headers']->set(Constants::TRACE_CARRIER, $carrier); diff --git a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php index c84d9577d..69d3836b9 100644 --- a/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php +++ b/src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php @@ -61,8 +61,8 @@ public function process(ProceedingJoinPoint $proceedingJoinPoint) public function handleGet(ProceedingJoinPoint $proceedingJoinPoint) { - $queue = $proceedingJoinPoint->arguments['keys']['name'] ?? 'default'; - Context::set('sentry.async_queue.name', $queue); + $destinationName = $proceedingJoinPoint->arguments['keys']['name'] ?? 'default'; + Context::set('sentry.messaging.destination.name', $destinationName); return $proceedingJoinPoint->process(); } @@ -83,13 +83,14 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint) /** @var \Hyperf\AsyncQueue\Driver\Driver $driver */ $driver = $proceedingJoinPoint->getInstance(); $messageId = method_exists($job, 'getId') ? $job->getId() : uniqid('async_queue_', true); - $queueName = Context::get('sentry.async_queue.name', 'default'); + $destinationName = Context::get('sentry.messaging.destination.name', 'default'); $bodySize = (fn ($job) => strlen($this->packer->pack($job)))->call($driver, $job); $data = [ 'messaging.system' => 'async_queue', + 'messaging.operation' => 'publish', 'messaging.message.id' => $messageId, - 'messaging.destination.name' => $queueName, 'messaging.message.body.size' => $bodySize, + 'messaging.destination.name' => $destinationName, ]; $data += match (true) { $driver instanceof RedisDriver => $this->buildSpanDataOfRedisDriver($driver), @@ -100,7 +101,7 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint) $carrier = $this->packer->pack($span, [ 'publish_time' => microtime(true), 'message_id' => $messageId, - 'queue_name' => $queueName, + 'destination_name' => $destinationName, 'body_size' => $bodySize, ]); diff --git a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php index 41a544c8f..3e7e1f42a 100644 --- a/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php +++ b/src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php @@ -23,6 +23,8 @@ /** * @property array $headers + * @property string $name + * @mixin ProduceMessage */ class KafkaProducerAspect extends AbstractAspect { @@ -57,13 +59,24 @@ protected function sendAsync(ProceedingJoinPoint $proceedingJoinPoint) return $proceedingJoinPoint->process(); } + $messageId = uniqid('kafka_', true); + $destinationName = $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown'; + $bodySize = strlen($proceedingJoinPoint->arguments['keys']['value'] ?? ''); + $span->setData([ 'messaging.system' => 'kafka', 'messaging.operation' => 'publish', - 'messaging.destination.name' => $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown', + 'messaging.message.id' => $messageId, + 'messaging.message.body.size' => $bodySize, + 'messaging.destination.name' => $destinationName, ]); - $carrier = $this->packer->pack($span); + $carrier = $this->packer->pack($span, [ + 'publish_time' => microtime(true), + 'message_id' => $messageId, + 'destination_name' => $destinationName, + 'body_size' => $bodySize, + ]); $headers = $proceedingJoinPoint->arguments['keys']['headers'] ?? []; $headers[] = (new RecordHeader()) ->setHeaderKey(Constants::TRACE_CARRIER) @@ -86,14 +99,18 @@ protected function sendBatchAsync(ProceedingJoinPoint $proceedingJoinPoint) return $proceedingJoinPoint->process(); } - $carrier = $this->packer->pack($span); + $packer = $this->packer; foreach ($messages as $message) { - ( - fn () => $this->headers[] = (new RecordHeader()) - ->setHeaderKey(Constants::TRACE_CARRIER) - ->setValue($carrier) - )->call($message); + (function () use ($span, $packer) { + $carrier = $packer->pack($span, [ + 'publish_time' => microtime(true), + 'message_id' => uniqid('kafka_', true), + 'destination_name' => $this->getTopic(), + 'body_size' => strlen((string) $this->getValue()), + ]); + $this->headers[] = (new RecordHeader())->setHeaderKey(Constants::TRACE_CARRIER)->setValue($carrier); + })->call($message); } return tap($proceedingJoinPoint->process(), fn () => $span->setOrigin('auto.kafka')->finish()); diff --git a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php index b2ff1aa9f..4e2b8d6bb 100644 --- a/src/sentry/src/Tracing/Listener/TracingAmqpListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAmqpListener.php @@ -19,6 +19,7 @@ use Hyperf\Amqp\Event\BeforeConsume; use Hyperf\Amqp\Event\FailToConsume; use Hyperf\Amqp\Message\ConsumerMessage; +use Hyperf\Context\Context; use Hyperf\Event\Contract\ListenerInterface; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Wire\AMQPTable; @@ -73,6 +74,7 @@ protected function startTransaction(BeforeConsume $event): void $applicationHeaders = $amqpMessage->has('application_headers') ? $amqpMessage->get('application_headers') : null; if ($applicationHeaders && isset($applicationHeaders[Constants::TRACE_CARRIER])) { [$sentryTrace, $baggage] = $this->packer->unpack($applicationHeaders[Constants::TRACE_CARRIER]); + Context::set(Constants::TRACE_CARRIER, $applicationHeaders[Constants::TRACE_CARRIER]); } } @@ -83,7 +85,7 @@ protected function startTransaction(BeforeConsume $event): void op: 'topic.process', description: $message::class, source: TransactionSource::custom() - ); + )->setStartTimestamp(microtime(true)); } protected function finishTransaction(AfterConsume|FailToConsume $event): void @@ -95,11 +97,22 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void return; } + $payload = []; + if ($carrier = Context::get(Constants::TRACE_CARRIER)) { + $payload = json_decode((string) $carrier, true); + } + /** @var ConsumerMessage $message */ $message = $event->getMessage(); $data = [ 'messaging.system' => 'amqp', 'messaging.operation' => 'process', + 'messaging.message.id' => $payload['message_id'] ?? null, + 'messaging.message.body.size' => $payload['body_size'] ?? null, + 'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null, + 'messaging.message.retry.count' => 0, + 'messaging.destination.name' => $payload['destination_name'] ?? null, + // for amqp 'messaging.amqp.message.type' => $message->getTypeString(), 'messaging.amqp.message.routing_key' => $message->getRoutingKey(), 'messaging.amqp.message.exchange' => $message->getExchange(), diff --git a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php index 3c573e3d1..2f0af2bff 100644 --- a/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php +++ b/src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php @@ -99,11 +99,12 @@ protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event $payload = json_decode((string) $carrier, true); $data = [ 'messaging.system' => 'async_queue', + 'messaging.operation' => 'process', 'messaging.message.id' => $payload['message_id'] ?? null, - 'messaging.destination.name' => $payload['queue_name'] ?? null, 'messaging.message.body.size' => $payload['body_size'] ?? null, - 'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0), + 'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null, 'messaging.message.retry.count' => $event->getMessage()->getAttempts(), + 'messaging.destination.name' => $payload['destination_name'] ?? null, ]; $tags = []; diff --git a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php index e7ad48118..baa280719 100644 --- a/src/sentry/src/Tracing/Listener/TracingKafkaListener.php +++ b/src/sentry/src/Tracing/Listener/TracingKafkaListener.php @@ -15,6 +15,7 @@ use FriendsOfHyperf\Sentry\Switcher; use FriendsOfHyperf\Sentry\Tracing\SpanStarter; use FriendsOfHyperf\Sentry\Util\CarrierPacker; +use Hyperf\Context\Context; use Hyperf\Event\Contract\ListenerInterface; use Hyperf\Kafka\Event\AfterConsume; use Hyperf\Kafka\Event\BeforeConsume; @@ -69,6 +70,7 @@ protected function startTransaction(BeforeConsume $event): void foreach ($message->getHeaders() as $header) { if ($header->getHeaderKey() === Constants::TRACE_CARRIER) { [$sentryTrace, $baggage] = $this->packer->unpack($header->getValue()); + Context::set(Constants::TRACE_CARRIER, $header->getValue()); break; } } @@ -81,7 +83,7 @@ protected function startTransaction(BeforeConsume $event): void op: $consumer->getTopic() . ' process', description: $consumer::class, source: TransactionSource::custom() - ); + )->setStartTimestamp(microtime(true)); } protected function finishTransaction(AfterConsume|FailToConsume $event): void @@ -93,12 +95,22 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void return; } + $payload = []; + if ($carrier = (string) Context::get(Constants::TRACE_CARRIER)) { + $payload = json_decode($carrier, true); + } + $consumer = $event->getConsumer(); $tags = []; $data = [ 'messaging.system' => 'kafka', 'messaging.operation' => 'process', + 'messaging.message.id' => $payload['message_id'] ?? null, + 'messaging.message.body.size' => $payload['body_size'] ?? null, + 'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null, + 'messaging.message.retry.count' => 0, 'messaging.destination.name' => $consumer->getTopic(), + // for kafka 'messaging.kafka.consumer.group' => $consumer->getGroupId(), 'messaging.kafka.consumer.pool' => $consumer->getPool(), ];