diff --git a/src/sentry/src/HttpClient/HttpClient.php b/src/sentry/src/HttpClient/HttpClient.php index c89f2afd3..b97970644 100644 --- a/src/sentry/src/HttpClient/HttpClient.php +++ b/src/sentry/src/HttpClient/HttpClient.php @@ -11,7 +11,6 @@ namespace FriendsOfHyperf\Sentry\HttpClient; -use Closure; use Hyperf\Coordinator\Constants; use Hyperf\Coordinator\CoordinatorManager; use Hyperf\Coroutine\Concurrent; @@ -22,6 +21,8 @@ use Sentry\Options; use Throwable; +use function Hyperf\Tappable\tap; + class HttpClient extends \Sentry\HttpClient\HttpClient { protected ?Channel $chan = null; @@ -47,62 +48,66 @@ public function __construct( public function sendRequest(Request $request, Options $options): Response { + // Start the loop if not started yet $this->loop(); + // Push the request to the channel $chan = $this->chan; - $chan->push(fn () => parent::sendRequest($request, $options)); + $chan?->push([$request, $options]); return new Response(202, ['X-Sentry-Request-Status' => ['Queued']], ''); } - public function close(): void - { - $chan = $this->chan; - $this->chan = null; - - $chan?->close(); - } - protected function loop(): void { - if ($this->chan != null) { - return; - } - + // The worker already exited if ($this->workerExited) { return; } - $this->chan = new Channel($this->channelSize); - - Coroutine::create(function () { - try { - while (true) { + // Initialize the channel and start the loop + $this->chan ??= tap(new Channel($this->channelSize), function () { + // Dump memory usage and channel size + // Coroutine::create(function () { + // while (! $this->chan?->isClosing()) { + // dump('Memory Usage(MB): ' . memory_get_usage(true) / 1024 / 1024); + // dump('Channel Size: ' . $this->chan?->getLength()); + // sleep(1); + // } + // }); + + // Start the loop + Coroutine::create(function () { + try { while (true) { - /** @var Closure|null $closure */ - $closure = $this->chan?->pop(); - if (! $closure) { - break 2; - } - try { - if ($this->concurrent) { - $this->concurrent->create($closure); - } else { - Coroutine::create($closure); + while (true) { + // If the channel is closing or pop failed, exit the loop + if (! $args = $this->chan?->pop()) { + break 2; + } + try { + $callable = fn () => parent::sendRequest(...$args); + if ($this->concurrent) { + $this->concurrent->create($callable); + } else { + Coroutine::create($callable); + } + } catch (Throwable) { + break; + } finally { + $callable = null; + $args = null; } - } catch (Throwable) { - break; - } finally { - $closure = null; } } + } catch (Throwable $e) { + } finally { + $this->close(); } - } catch (Throwable $e) { - } finally { - $this->close(); - } + }); }); + // Wait for the worker exit event $this->waitingWorkerExit ??= Coroutine::create(function () { try { CoordinatorManager::until(Constants::WORKER_EXIT)->yield(); @@ -112,4 +117,14 @@ protected function loop(): void } }); } + + protected function close(): void + { + $chan = $this->chan; + $chan?->close(); + + if ($this->chan === $chan) { + $this->chan = null; + } + } }