Skip to content
Merged
17 changes: 16 additions & 1 deletion src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,34 @@ protected function produceMessage(ProceedingJoinPoint $proceedingJoinPoint)
'topic.send',
sprintf('%s::%s()', $proceedingJoinPoint->className, $proceedingJoinPoint->methodName)
);

if (! $span) {
return $proceedingJoinPoint->process();
}

$messageId = uniqid('amqp_', true);
$destinationName = $producerMessage->getExchange();
$bodySize = strlen($producerMessage->payload());
$span->setData([
'messaging.system' => 'amqp',
'messaging.operation' => 'publish',
'messaging.message.id' => $messageId,
'messaging.message.body.size' => $bodySize,
'messaging.destination.name' => $destinationName,
// for amqp
'messaging.amqp.message.type' => $producerMessage->getTypeString(),
'messaging.amqp.message.routing_key' => $producerMessage->getRoutingKey(),
'messaging.amqp.message.exchange' => $producerMessage->getExchange(),
'messaging.amqp.message.pool_name' => $producerMessage->getPoolName(),
]);

$carrier = $this->packer->pack($span);
$carrier = $this->packer->pack($span, [
'publish_time' => microtime(true),
'message_id' => $messageId,
'destination_name' => $destinationName,
'body_size' => $bodySize,
]);

(function () use ($carrier) {
$this->properties['application_headers'] ??= new AMQPTable();
$this->properties['application_headers']->set(Constants::TRACE_CARRIER, $carrier);
Expand Down
11 changes: 6 additions & 5 deletions src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public function process(ProceedingJoinPoint $proceedingJoinPoint)

public function handleGet(ProceedingJoinPoint $proceedingJoinPoint)
{
$queue = $proceedingJoinPoint->arguments['keys']['name'] ?? 'default';
Context::set('sentry.async_queue.name', $queue);
$destinationName = $proceedingJoinPoint->arguments['keys']['name'] ?? 'default';
Context::set('sentry.messaging.destination.name', $destinationName);

return $proceedingJoinPoint->process();
}
Expand All @@ -83,13 +83,14 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint)
/** @var \Hyperf\AsyncQueue\Driver\Driver $driver */
$driver = $proceedingJoinPoint->getInstance();
$messageId = method_exists($job, 'getId') ? $job->getId() : uniqid('async_queue_', true);
$queueName = Context::get('sentry.async_queue.name', 'default');
$destinationName = Context::get('sentry.messaging.destination.name', 'default');
$bodySize = (fn ($job) => strlen($this->packer->pack($job)))->call($driver, $job);
$data = [
'messaging.system' => 'async_queue',
'messaging.operation' => 'publish',
'messaging.message.id' => $messageId,
'messaging.destination.name' => $queueName,
'messaging.message.body.size' => $bodySize,
'messaging.destination.name' => $destinationName,
];
$data += match (true) {
$driver instanceof RedisDriver => $this->buildSpanDataOfRedisDriver($driver),
Expand All @@ -100,7 +101,7 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint)
$carrier = $this->packer->pack($span, [
'publish_time' => microtime(true),
'message_id' => $messageId,
'queue_name' => $queueName,
'destination_name' => $destinationName,
'body_size' => $bodySize,
]);

Expand Down
33 changes: 25 additions & 8 deletions src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

/**
* @property array $headers
* @property string $name
* @mixin ProduceMessage
*/
class KafkaProducerAspect extends AbstractAspect
{
Expand Down Expand Up @@ -57,13 +59,24 @@ protected function sendAsync(ProceedingJoinPoint $proceedingJoinPoint)
return $proceedingJoinPoint->process();
}

$messageId = uniqid('kafka_', true);
$destinationName = $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown';
$bodySize = strlen($proceedingJoinPoint->arguments['keys']['value'] ?? '');

$span->setData([
'messaging.system' => 'kafka',
'messaging.operation' => 'publish',
'messaging.destination.name' => $proceedingJoinPoint->arguments['keys']['topic'] ?? 'unknown',
'messaging.message.id' => $messageId,
'messaging.message.body.size' => $bodySize,
'messaging.destination.name' => $destinationName,
]);

$carrier = $this->packer->pack($span);
$carrier = $this->packer->pack($span, [
'publish_time' => microtime(true),
'message_id' => $messageId,
'destination_name' => $destinationName,
'body_size' => $bodySize,
]);
$headers = $proceedingJoinPoint->arguments['keys']['headers'] ?? [];
$headers[] = (new RecordHeader())
->setHeaderKey(Constants::TRACE_CARRIER)
Expand All @@ -86,14 +99,18 @@ protected function sendBatchAsync(ProceedingJoinPoint $proceedingJoinPoint)
return $proceedingJoinPoint->process();
}

$carrier = $this->packer->pack($span);
$packer = $this->packer;

foreach ($messages as $message) {
(
fn () => $this->headers[] = (new RecordHeader())
->setHeaderKey(Constants::TRACE_CARRIER)
->setValue($carrier)
)->call($message);
(function () use ($span, $packer) {
$carrier = $packer->pack($span, [
'publish_time' => microtime(true),
'message_id' => uniqid('kafka_', true),
'destination_name' => $this->getTopic(),
'body_size' => strlen((string) $this->getValue()),
]);
$this->headers[] = (new RecordHeader())->setHeaderKey(Constants::TRACE_CARRIER)->setValue($carrier);
})->call($message);
}

return tap($proceedingJoinPoint->process(), fn () => $span->setOrigin('auto.kafka')->finish());
Expand Down
15 changes: 14 additions & 1 deletion src/sentry/src/Tracing/Listener/TracingAmqpListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Hyperf\Amqp\Event\BeforeConsume;
use Hyperf\Amqp\Event\FailToConsume;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Context\Context;
use Hyperf\Event\Contract\ListenerInterface;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
Expand Down Expand Up @@ -73,6 +74,7 @@ protected function startTransaction(BeforeConsume $event): void
$applicationHeaders = $amqpMessage->has('application_headers') ? $amqpMessage->get('application_headers') : null;
if ($applicationHeaders && isset($applicationHeaders[Constants::TRACE_CARRIER])) {
[$sentryTrace, $baggage] = $this->packer->unpack($applicationHeaders[Constants::TRACE_CARRIER]);
Context::set(Constants::TRACE_CARRIER, $applicationHeaders[Constants::TRACE_CARRIER]);
}
}

Expand All @@ -83,7 +85,7 @@ protected function startTransaction(BeforeConsume $event): void
op: 'topic.process',
description: $message::class,
source: TransactionSource::custom()
);
)->setStartTimestamp(microtime(true));
}

protected function finishTransaction(AfterConsume|FailToConsume $event): void
Expand All @@ -95,11 +97,22 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void
return;
}

$payload = [];
if ($carrier = Context::get(Constants::TRACE_CARRIER)) {
$payload = json_decode((string) $carrier, true);
}

/** @var ConsumerMessage $message */
$message = $event->getMessage();
$data = [
'messaging.system' => 'amqp',
'messaging.operation' => 'process',
'messaging.message.id' => $payload['message_id'] ?? null,
'messaging.message.body.size' => $payload['body_size'] ?? null,
'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null,
'messaging.message.retry.count' => 0,
'messaging.destination.name' => $payload['destination_name'] ?? null,
// for amqp
'messaging.amqp.message.type' => $message->getTypeString(),
'messaging.amqp.message.routing_key' => $message->getRoutingKey(),
'messaging.amqp.message.exchange' => $message->getExchange(),
Expand Down
5 changes: 3 additions & 2 deletions src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event
$payload = json_decode((string) $carrier, true);
$data = [
'messaging.system' => 'async_queue',
'messaging.operation' => 'process',
'messaging.message.id' => $payload['message_id'] ?? null,
'messaging.destination.name' => $payload['queue_name'] ?? null,
'messaging.message.body.size' => $payload['body_size'] ?? null,
'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0),
'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null,
'messaging.message.retry.count' => $event->getMessage()->getAttempts(),
'messaging.destination.name' => $payload['destination_name'] ?? null,
];
$tags = [];

Expand Down
14 changes: 13 additions & 1 deletion src/sentry/src/Tracing/Listener/TracingKafkaListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use Hyperf\Context\Context;
use Hyperf\Event\Contract\ListenerInterface;
use Hyperf\Kafka\Event\AfterConsume;
use Hyperf\Kafka\Event\BeforeConsume;
Expand Down Expand Up @@ -69,6 +70,7 @@ protected function startTransaction(BeforeConsume $event): void
foreach ($message->getHeaders() as $header) {
if ($header->getHeaderKey() === Constants::TRACE_CARRIER) {
[$sentryTrace, $baggage] = $this->packer->unpack($header->getValue());
Context::set(Constants::TRACE_CARRIER, $header->getValue());
break;
}
}
Expand All @@ -81,7 +83,7 @@ protected function startTransaction(BeforeConsume $event): void
op: $consumer->getTopic() . ' process',
description: $consumer::class,
source: TransactionSource::custom()
);
)->setStartTimestamp(microtime(true));
}

protected function finishTransaction(AfterConsume|FailToConsume $event): void
Expand All @@ -93,12 +95,22 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void
return;
}

$payload = [];
if ($carrier = (string) Context::get(Constants::TRACE_CARRIER)) {
$payload = json_decode($carrier, true);
}

$consumer = $event->getConsumer();
$tags = [];
$data = [
'messaging.system' => 'kafka',
'messaging.operation' => 'process',
'messaging.message.id' => $payload['message_id'] ?? null,
'messaging.message.body.size' => $payload['body_size'] ?? null,
'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null,
'messaging.message.retry.count' => 0,
'messaging.destination.name' => $consumer->getTopic(),
// for kafka
'messaging.kafka.consumer.group' => $consumer->getGroupId(),
'messaging.kafka.consumer.pool' => $consumer->getPool(),
];
Expand Down