Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions src/sentry/src/Tracing/Aspect/AmqpProducerAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use FriendsOfHyperf\Sentry\Util\Carrier;
use Hyperf\Amqp\Message\ProducerMessage;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
Expand All @@ -33,10 +33,8 @@ class AmqpProducerAspect extends AbstractAspect
'Hyperf\Amqp\Producer::produceMessage',
];

public function __construct(
protected Switcher $switcher,
protected CarrierPacker $packer
) {
public function __construct(protected Switcher $switcher)
{
}

public function process(ProceedingJoinPoint $proceedingJoinPoint)
Expand Down Expand Up @@ -84,8 +82,7 @@ protected function handleProduceMessage(ProceedingJoinPoint $proceedingJoinPoint
'messaging.amqp.message.exchange' => $producerMessage->getExchange(),
'messaging.amqp.message.pool_name' => $producerMessage->getPoolName(),
]);

$carrier = $this->packer->pack($span, [
$carrier = Carrier::fromSpan($span)->with([
'publish_time' => microtime(true),
'message_id' => $messageId,
'destination_name' => $destinationName,
Expand All @@ -94,7 +91,7 @@ protected function handleProduceMessage(ProceedingJoinPoint $proceedingJoinPoint

(function () use ($carrier) {
$this->properties['application_headers'] ??= new AMQPTable();
$this->properties['application_headers']->set(Constants::TRACE_CARRIER, $carrier);
$this->properties['application_headers']->set(Constants::TRACE_CARRIER, $carrier->toJson());
})->call($producerMessage);

return tap($proceedingJoinPoint->process(), fn () => $span->setOrigin('auto.amqp')->finish());
Expand Down
28 changes: 12 additions & 16 deletions src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use FriendsOfHyperf\Sentry\Util\Carrier;
use Hyperf\AsyncQueue\Driver\RedisDriver;
use Hyperf\Context\Context;
use Hyperf\Di\Aop\AbstractAspect;
Expand All @@ -25,6 +25,7 @@
/**
* @property \Hyperf\AsyncQueue\Driver\ChannelConfig $channel
* @property \Hyperf\Redis\RedisProxy $redis
* @property \Hyperf\Contract\PackerInterface $packer
* @property string $poolName
*/
class AsyncQueueJobMessageAspect extends AbstractAspect
Expand All @@ -39,8 +40,7 @@ class AsyncQueueJobMessageAspect extends AbstractAspect
];

public function __construct(
protected Switcher $switcher,
protected CarrierPacker $packer
protected Switcher $switcher
) {
}

Expand Down Expand Up @@ -98,7 +98,7 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint)
};

$span->setData($data);
$carrier = $this->packer->pack($span, [
$carrier = Carrier::fromSpan($span)->with([
'publish_time' => microtime(true),
'message_id' => $messageId,
'destination_name' => $destinationName,
Expand Down Expand Up @@ -137,9 +137,9 @@ protected function handleSerialize(ProceedingJoinPoint $proceedingJoinPoint)
return with($proceedingJoinPoint->process(), function ($result) {
if (is_array($result) && $carrier = Context::get(Constants::TRACE_CARRIER)) {
if (array_is_list($result)) {
$result[] = $carrier;
$result[] = $carrier->toJson();
} elseif (isset($result['job'])) {
$result[Constants::TRACE_CARRIER] = $carrier;
$result[Constants::TRACE_CARRIER] = $carrier->toJson();
}
}

Expand All @@ -151,19 +151,15 @@ protected function handleUnserialize(ProceedingJoinPoint $proceedingJoinPoint)
{
/** @var array $data */
$data = $proceedingJoinPoint->arguments['keys']['data'] ?? [];
$carrier = null;

if (is_array($data)) {
if (array_is_list($data)) {
$carrier = array_last($data);
} elseif (isset($data['job'])) {
$carrier = $data[Constants::TRACE_CARRIER] ?? '';
}
}
$carrier = match (true) {
is_array($data) && array_is_list($data) => array_last($data),
isset($data['job']) => $data[Constants::TRACE_CARRIER] ?? '',
default => null,
};

/** @var string|null $carrier */
if ($carrier) {
Context::set(Constants::TRACE_CARRIER, $carrier);
Context::set(Constants::TRACE_CARRIER, Carrier::fromJson($carrier));
}

return $proceedingJoinPoint->process();
Expand Down
39 changes: 19 additions & 20 deletions src/sentry/src/Tracing/Aspect/KafkaProducerAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use FriendsOfHyperf\Sentry\Util\Carrier;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use longlang\phpkafka\Producer\ProduceMessage;
Expand All @@ -37,8 +37,7 @@ class KafkaProducerAspect extends AbstractAspect
];

public function __construct(
protected Switcher $switcher,
protected CarrierPacker $packer
protected Switcher $switcher
) {
}

Expand Down Expand Up @@ -78,16 +77,17 @@ protected function sendAsync(ProceedingJoinPoint $proceedingJoinPoint)
'messaging.destination.name' => $destinationName,
]);

$carrier = $this->packer->pack($span, [
'publish_time' => microtime(true),
'message_id' => $messageId,
'destination_name' => $destinationName,
'body_size' => $bodySize,
]);
$carrier = Carrier::fromSpan($span)
->with([
'publish_time' => microtime(true),
'message_id' => $messageId,
'destination_name' => $destinationName,
'body_size' => $bodySize,
]);
$headers = $proceedingJoinPoint->arguments['keys']['headers'] ?? [];
$headers[] = (new RecordHeader())
->setHeaderKey(Constants::TRACE_CARRIER)
->setValue($carrier);
->setValue($carrier->toJson());
$proceedingJoinPoint->arguments['keys']['headers'] = $headers;

return tap($proceedingJoinPoint->process(), fn () => $span->setOrigin('auto.kafka')->finish());
Expand All @@ -106,17 +106,16 @@ protected function sendBatchAsync(ProceedingJoinPoint $proceedingJoinPoint)
return $proceedingJoinPoint->process();
}

$packer = $this->packer;

foreach ($messages as $message) {
(function () use ($span, $packer) {
$carrier = $packer->pack($span, [
'publish_time' => microtime(true),
'message_id' => uniqid('kafka_', true),
'destination_name' => $this->getTopic(),
'body_size' => strlen((string) $this->getValue()),
]);
$this->headers[] = (new RecordHeader())->setHeaderKey(Constants::TRACE_CARRIER)->setValue($carrier);
(function () use ($span) {
$carrier = Carrier::fromSpan($span)
->with([
'publish_time' => microtime(true),
'message_id' => uniqid('kafka_', true),
'destination_name' => $this->getTopic(),
'body_size' => strlen((string) $this->getValue()),
]);
$this->headers[] = (new RecordHeader())->setHeaderKey(Constants::TRACE_CARRIER)->setValue($carrier->toJson());
})->call($message);
}

Expand Down
7 changes: 3 additions & 4 deletions src/sentry/src/Tracing/Aspect/RpcAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use FriendsOfHyperf\Sentry\Util\Carrier;
use Hyperf\Context\Context;
use Hyperf\Contract\ConfigInterface;
use Hyperf\Coroutine\Coroutine;
Expand Down Expand Up @@ -46,8 +46,7 @@ class RpcAspect extends AbstractAspect

public function __construct(
protected ContainerInterface $container,
protected Switcher $switcher,
protected CarrierPacker $packer
protected Switcher $switcher
) {
}

Expand Down Expand Up @@ -99,7 +98,7 @@ private function handleGenerateRpcPath(ProceedingJoinPoint $proceedingJoinPoint)
Context::set(static::SPAN, $span->setData($data));

if ($this->container->has(Rpc\Context::class)) {
$this->container->get(Rpc\Context::class)->set(Constants::TRACE_CARRIER, $this->packer->pack($span));
$this->container->get(Rpc\Context::class)->set(Constants::TRACE_CARRIER, Carrier::fromSpan($span)->toJson());
}

return $path;
Expand Down
29 changes: 13 additions & 16 deletions src/sentry/src/Tracing/Listener/TracingAmqpListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use FriendsOfHyperf\Sentry\Util\Carrier;
use Hyperf\Amqp\Event\AfterConsume;
use Hyperf\Amqp\Event\BeforeConsume;
use Hyperf\Amqp\Event\FailToConsume;
Expand All @@ -32,8 +32,7 @@ class TracingAmqpListener implements ListenerInterface
use SpanStarter;

public function __construct(
protected Switcher $switcher,
protected CarrierPacker $packer
protected Switcher $switcher
) {
}

Expand Down Expand Up @@ -65,22 +64,22 @@ public function process(object $event): void
protected function startTransaction(BeforeConsume $event): void
{
$message = $event->getMessage();
$sentryTrace = $baggage = '';
$carrier = null;

if (method_exists($event, 'getAMQPMessage')) {
/** @var AMQPMessage $amqpMessage */
$amqpMessage = $event->getAMQPMessage();
/** @var AMQPTable|null $applicationHeaders */
$applicationHeaders = $amqpMessage->has('application_headers') ? $amqpMessage->get('application_headers') : null;
if ($applicationHeaders && isset($applicationHeaders[Constants::TRACE_CARRIER])) {
[$sentryTrace, $baggage] = $this->packer->unpack($applicationHeaders[Constants::TRACE_CARRIER]);
Context::set(Constants::TRACE_CARRIER, $applicationHeaders[Constants::TRACE_CARRIER]);
$carrier = Carrier::fromJson($applicationHeaders[Constants::TRACE_CARRIER]);
Context::set(Constants::TRACE_CARRIER, $carrier);
}
}

$this->continueTrace(
sentryTrace: $sentryTrace,
baggage: $baggage,
sentryTrace: $carrier?->getSentryTrace(),
baggage: $carrier?->getBaggage(),
name: $message::class,
op: 'queue.process',
description: $message::class,
Expand All @@ -97,21 +96,19 @@ protected function finishTransaction(AfterConsume|FailToConsume $event): void
return;
}

$payload = [];
if ($carrier = Context::get(Constants::TRACE_CARRIER)) {
$payload = json_decode((string) $carrier, true);
}
/** @var Carrier|null $carrier */
$carrier = Context::get(Constants::TRACE_CARRIER);

/** @var ConsumerMessage $message */
$message = $event->getMessage();
$data = [
'messaging.system' => 'amqp',
'messaging.operation' => 'process',
'messaging.message.id' => $payload['message_id'] ?? null,
'messaging.message.body.size' => $payload['body_size'] ?? null,
'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null,
'messaging.message.id' => $carrier?->get('message_id'),
'messaging.message.body.size' => $carrier?->get('body_size'),
'messaging.message.receive.latency' => $carrier?->has('publish_time') ? (microtime(true) - $carrier->get('publish_time')) : null,
'messaging.message.retry.count' => 0,
'messaging.destination.name' => $payload['destination_name'] ?? $message->getExchange(),
'messaging.destination.name' => $carrier?->get('destination_name') ?? $message->getExchange(),
// for amqp
'messaging.amqp.message.type' => $message->getTypeString(),
'messaging.amqp.message.routing_key' => $message->getRoutingKey(),
Expand Down
28 changes: 10 additions & 18 deletions src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use FriendsOfHyperf\Sentry\Constants;
use FriendsOfHyperf\Sentry\Switcher;
use FriendsOfHyperf\Sentry\Tracing\SpanStarter;
use FriendsOfHyperf\Sentry\Util\CarrierPacker;
use FriendsOfHyperf\Sentry\Util\Carrier;
use Hyperf\AsyncQueue\Event\AfterHandle;
use Hyperf\AsyncQueue\Event\BeforeHandle;
use Hyperf\AsyncQueue\Event\FailedHandle;
Expand All @@ -31,8 +31,7 @@ class TracingAsyncQueueListener implements ListenerInterface
use SpanStarter;

public function __construct(
protected Switcher $switcher,
protected CarrierPacker $packer
protected Switcher $switcher
) {
}

Expand Down Expand Up @@ -64,20 +63,14 @@ public function process(object $event): void

protected function startTransaction(BeforeHandle $event): void
{
$sentryTrace = $baggage = '';

/** @var string|null $carrier */
/** @var Carrier|null $carrier */
$carrier = Context::get(Constants::TRACE_CARRIER, null, Coroutine::parentId());

if ($carrier) {
[$sentryTrace, $baggage] = $this->packer->unpack($carrier);
}

$job = $event->getMessage()->job();

$this->continueTrace(
sentryTrace: $sentryTrace,
baggage: $baggage,
sentryTrace: $carrier?->getSentryTrace(),
baggage: $carrier?->getBaggage(),
name: $job::class,
op: 'queue.process',
description: 'async_queue: ' . $job::class,
Expand All @@ -94,17 +87,16 @@ protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event
return;
}

/** @var string|null $carrier */
/** @var Carrier|null $carrier */
$carrier = Context::get(Constants::TRACE_CARRIER, null, Coroutine::parentId());
$payload = json_decode((string) $carrier, true);
$data = [
'messaging.system' => 'async_queue',
'messaging.operation' => 'process',
'messaging.message.id' => $payload['message_id'] ?? null,
'messaging.message.body.size' => $payload['body_size'] ?? null,
'messaging.message.receive.latency' => isset($payload['publish_time']) ? (microtime(true) - $payload['publish_time']) : null,
'messaging.message.id' => $carrier?->get('message_id'),
'messaging.message.body.size' => $carrier?->get('body_size'),
'messaging.message.receive.latency' => $carrier?->has('publish_time') ? (microtime(true) - $carrier->get('publish_time')) : null,
'messaging.message.retry.count' => $event->getMessage()->getAttempts(),
'messaging.destination.name' => $payload['destination_name'] ?? null,
'messaging.destination.name' => $carrier?->get('destination_name'),
];
$tags = [];

Expand Down
Loading