Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions src/sentry/src/Tracing/Aspect/AsyncQueueJobMessageAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
use Hyperf\Context\Context;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use Throwable;

use function Hyperf\Support\with;

Expand All @@ -33,6 +32,7 @@ class AsyncQueueJobMessageAspect extends AbstractAspect
use SpanStarter;

public array $classes = [
'Hyperf\AsyncQueue\Driver\DriverFactory::get',
'Hyperf\AsyncQueue\Driver\*Driver::push',
'Hyperf\AsyncQueue\JobMessage::__serialize',
'Hyperf\AsyncQueue\JobMessage::__unserialize',
Expand All @@ -51,27 +51,41 @@ public function process(ProceedingJoinPoint $proceedingJoinPoint)
}

return match ($proceedingJoinPoint->methodName) {
'get' => $this->handleGet($proceedingJoinPoint),
'push' => $this->handlePush($proceedingJoinPoint),
'__serialize' => $this->handleSerialize($proceedingJoinPoint),
'__unserialize' => $this->handleUnserialize($proceedingJoinPoint),
default => $proceedingJoinPoint->process()
};
}

public function handleGet(ProceedingJoinPoint $proceedingJoinPoint)
{
$queue = $proceedingJoinPoint->arguments['keys']['name'] ?? 'default';
Context::set('sentry.async_queue.name', $queue);
}

public function handlePush(ProceedingJoinPoint $proceedingJoinPoint)
{
$job = $proceedingJoinPoint->arguments['keys']['job'] ?? null;
$span = $this->startSpan(
'queue.publish',
$proceedingJoinPoint->arguments['keys']['job']::class
$job::class
);

if (! $span) {
return $proceedingJoinPoint->process();
}

try {
$messageId = method_exists($job, 'getId') ? $job->getId() : uniqid('async_queue_', true);
$queueName = Context::get('sentry.async_queue.name', 'default');
$bodySize = (fn ($job) => strlen($this->packer->pack($job)))->call($this, $job);
$data = [
'messaging.system' => 'async_queue',
'messaging.message.id' => $messageId,
'messaging.destination.name' => $queueName,
'messaging.message.body.size' => $bodySize,
];

/** @var \Hyperf\AsyncQueue\Driver\Driver $driver */
Expand All @@ -82,11 +96,16 @@ public function handlePush(ProceedingJoinPoint $proceedingJoinPoint)
};

$span->setData($data);
$carrier = $this->packer->pack($span);
$carrier = $this->packer->pack($span, [
'publish_time' => microtime(true),
'message_id' => $messageId,
'queue_name' => $queueName,
'body_size' => $bodySize,
]);

Context::set(Constants::TRACE_CARRIER, $carrier);

return $proceedingJoinPoint->process();
} catch (Throwable) {
} finally {
$span->setOrigin('auto.queue')->finish();
}
Expand Down
14 changes: 12 additions & 2 deletions src/sentry/src/Tracing/Listener/TracingAsyncQueueListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected function startTransaction(BeforeHandle $event): void
op: 'queue.process',
description: 'async_queue: ' . $job::class,
source: TransactionSource::custom()
);
)->setStartTimestamp(microtime(true));
}

protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event): void
Expand All @@ -94,7 +94,17 @@ protected function finishTransaction(AfterHandle|RetryHandle|FailedHandle $event
return;
}

$data = [];
/** @var string|null $carrier */
$carrier = Context::get(Constants::TRACE_CARRIER, null, Coroutine::parentId());
$payload = json_decode((string) $carrier, true);
$data = [
'messaging.system' => 'async_queue',
'messaging.message.id' => $payload['message_id'] ?? null,
'messaging.destination.name' => $payload['queue_name'] ?? null,
'messaging.message.body.size' => $payload['body_size'] ?? null,
'messaging.message.receive.latency' => microtime(true) - ($payload['publish_time'] ?? 0),
'messaging.message.retry.count' => 0,
];
$tags = [];

if (method_exists($event, 'getThrowable') && $exception = $event->getThrowable()) {
Expand Down
3 changes: 2 additions & 1 deletion src/sentry/src/Util/CarrierPacker.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ public function unpack(string $data): array
}
}

public function pack(Span $span): string
public function pack(Span $span, array $extra = []): string
{
return json_encode([
'sentry-trace' => $span->toTraceparent(),
'baggage' => $span->toBaggage(),
'traceparent' => $span->toW3CTraceparent(),
...$extra,
]);
}
}