From 7e0e1269e289468983f6828826d309d64846d154 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Fri, 14 Nov 2025 07:50:09 +0800 Subject: [PATCH 1/3] feat: introduce fluent API for dispatch() helper function This commit introduces a new fluent dispatch API to provide a more expressive and flexible way to dispatch jobs, AMQP messages, and Kafka messages. Key changes: - Add new `FriendsOfHyperf\Support\dispatch()` helper function that returns pending dispatch objects - Implement `PendingAsyncQueueDispatch` for async queue jobs with fluent methods: `onPool()`, `delay()`, `setMaxAttempts()` - Implement `PendingAmqpProducerMessageDispatch` for AMQP messages with fluent methods: `onPool()`, `setPayload()`, `withHeader()`, `setConfirm()`, `setTimeout()` - Implement `PendingKafkaProducerMessageDispatch` for Kafka messages with fluent methods: `onPool()`, `setKey()`, `setValue()`, `withHeader()` - All pending dispatch classes support Hyperf's `Conditionable` trait for conditional chaining - Automatically wrap closures in `CallQueuedClosure` when dispatched - Deprecate global `dispatch()` helper in favor of namespace-qualified version - Add comprehensive test coverage for all dispatch scenarios Benefits: - Cleaner, more readable code with method chaining - Type-safe dispatch configuration - Consistent API across different message broker types - Backward compatible with existing code Example usage: ```php use function FriendsOfHyperf\Support\dispatch; // Async queue with fluent API dispatch(new ProcessPodcast($podcast)) ->onPool('high-priority') ->delay(60) ->setMaxAttempts(3); // AMQP message with fluent API dispatch($amqpMessage) ->onPool('notifications') ->withHeader('trace-id', $traceId) ->setConfirm(true) ->setTimeout(10); // Kafka message with fluent API dispatch($kafkaMessage) ->onPool('events') ->withHeader('user-id', $userId) ->setKey($partitionKey); // Closure support dispatch(function () { // Job logic here })->onPool('default')->delay(30); ``` --- src/helpers/src/Functions.php | 1 + .../PendingAmqpProducerMessageDispatch.php | 76 +++ .../src/Bus/PendingAsyncQueueDispatch.php | 56 ++ .../PendingKafkaProducerMessageDispatch.php | 67 +++ src/support/src/Functions.php | 28 + tests/Support/DispatchTest.php | 548 ++++++++++++++++++ 6 files changed, 776 insertions(+) create mode 100644 src/support/src/Bus/PendingAmqpProducerMessageDispatch.php create mode 100644 src/support/src/Bus/PendingAsyncQueueDispatch.php create mode 100644 src/support/src/Bus/PendingKafkaProducerMessageDispatch.php create mode 100644 tests/Support/DispatchTest.php diff --git a/src/helpers/src/Functions.php b/src/helpers/src/Functions.php index 32580e43d..972b84151 100644 --- a/src/helpers/src/Functions.php +++ b/src/helpers/src/Functions.php @@ -190,6 +190,7 @@ function di(?string $abstract = null, array $parameters = []) } /** + * @deprecated since v3.1, will be removed in v3.2, use `FriendsOfHyperf\Support\dispatch()` instead * @param AsyncTaskInterface|Closure|JobInterface|ProduceMessage|ProducerMessageInterface|object $job * @return bool */ diff --git a/src/support/src/Bus/PendingAmqpProducerMessageDispatch.php b/src/support/src/Bus/PendingAmqpProducerMessageDispatch.php new file mode 100644 index 000000000..df74e679e --- /dev/null +++ b/src/support/src/Bus/PendingAmqpProducerMessageDispatch.php @@ -0,0 +1,76 @@ +pool && $this->message->setPoolName($this->pool); + ApplicationContext::getContainer() + ->get(Producer::class) + ->produce($this->message, $this->confirm, $this->timeout); + } + + public function onPool(string $pool): static + { + $this->pool = $pool; + return $this; + } + + public function setPayload(mixed $data): static + { + $this->message->setPayload($data); + return $this; + } + + public function withHeader(string $key, mixed $value, ?int $ttl = null): static + { + (function () use ($key, $value, $ttl) { + $this->properties['application_headers'] ??= new \PhpAmqpLib\Wire\AMQPTable(); // @phpstan-ignore-line + $this->properties['application_headers']->set($key, $value, $ttl); + })->call($this->message); + return $this; + } + + public function setConfirm(bool $confirm): static + { + $this->confirm = $confirm; + return $this; + } + + public function setTimeout(int $timeout): static + { + $this->timeout = $timeout; + return $this; + } +} diff --git a/src/support/src/Bus/PendingAsyncQueueDispatch.php b/src/support/src/Bus/PendingAsyncQueueDispatch.php new file mode 100644 index 000000000..50aa8cf8e --- /dev/null +++ b/src/support/src/Bus/PendingAsyncQueueDispatch.php @@ -0,0 +1,56 @@ +get(DriverFactory::class) + ->get($this->pool) + ->push($this->job, $this->delay); + } + + public function setMaxAttempts(int $maxAttempts): static + { + $this->job->setMaxAttempts($maxAttempts); + return $this; + } + + public function onPool(string $pool): static + { + $this->pool = $pool; + return $this; + } + + public function delay(int $delay): static + { + $this->delay = $delay; + return $this; + } +} diff --git a/src/support/src/Bus/PendingKafkaProducerMessageDispatch.php b/src/support/src/Bus/PendingKafkaProducerMessageDispatch.php new file mode 100644 index 000000000..8ce23e4d9 --- /dev/null +++ b/src/support/src/Bus/PendingKafkaProducerMessageDispatch.php @@ -0,0 +1,67 @@ +get(ProducerManager::class) + ->getProducer($this->pool) + ->sendBatch([$this->message]); + } + + public function onPool(string $pool): static + { + $this->pool = $pool; + return $this; + } + + public function setKey(string $key): static + { + (fn () => $this->key = $key)->call($this->message); + return $this; + } + + public function setValue(string $value): static + { + (fn () => $this->value = $value)->call($this->message); + return $this; + } + + public function withHeader(string $key, string $value): static + { + $header = (new RecordHeader())->setHeaderKey($key)->setValue($value); + (fn () => $this->headers[] = $header)->call($this->message); + return $this; + } +} diff --git a/src/support/src/Functions.php b/src/support/src/Functions.php index cc9f43026..e5ab28da5 100644 --- a/src/support/src/Functions.php +++ b/src/support/src/Functions.php @@ -13,9 +13,37 @@ use Closure; use Exception; +use FriendsOfHyperf\AsyncQueueClosureJob\CallQueuedClosure; +use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch; +use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch; +use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch; +use Hyperf\Amqp\Message\ProducerMessageInterface; +use Hyperf\AsyncQueue\JobInterface; +use InvalidArgumentException; +use longlang\phpkafka\Producer\ProduceMessage; use function Hyperf\Support\value; +/** + * Do not assign a value to the return value of this function unless you are very clear about the consequences of doing so. + * @param Closure|JobInterface|ProduceMessage|ProducerMessageInterface|mixed $job + * @return ($job is Closure ? PendingAsyncQueueDispatch : ($job is JobInterface ? PendingAsyncQueueDispatch : ($job is ProducerMessageInterface ? PendingAmqpProducerMessageDispatch : PendingKafkaProducerMessageDispatch))) + * @throws InvalidArgumentException + */ +function dispatch($job) +{ + if ($job instanceof Closure) { + $job = CallQueuedClosure::create($job); + } + + return match (true) { + interface_exists(ProducerMessageInterface::class) && $job instanceof ProducerMessageInterface => new PendingAmqpProducerMessageDispatch($job), + class_exists(ProduceMessage::class) && $job instanceof ProduceMessage => new PendingKafkaProducerMessageDispatch($job), + interface_exists(JobInterface::class) && $job instanceof JobInterface => new PendingAsyncQueueDispatch($job), + default => throw new InvalidArgumentException('Unsupported job type.') + }; +} + /** * Retry an operation a given number of times. * @template TReturn diff --git a/tests/Support/DispatchTest.php b/tests/Support/DispatchTest.php new file mode 100644 index 000000000..b2d615ebd --- /dev/null +++ b/tests/Support/DispatchTest.php @@ -0,0 +1,548 @@ +setupDefaultContainerMock(); + } + + protected function tearDown(): void + { + parent::tearDown(); + m::close(); + } + + public function testDispatchWithClosure() + { + $closure = function () { + return 'test'; + }; + + $result = dispatch($closure); + + $this->assertInstanceOf(PendingAsyncQueueDispatch::class, $result); + + // Verify the underlying job is CallQueuedClosure + $job = $this->getProperty($result, 'job'); + $this->assertInstanceOf(CallQueuedClosure::class, $job); + } + + public function testDispatchWithJobInterface() + { + $job = m::mock(JobInterface::class); + + $result = dispatch($job); + + $this->assertInstanceOf(PendingAsyncQueueDispatch::class, $result); + $this->assertSame($job, $this->getProperty($result, 'job')); + } + + public function testDispatchWithProducerMessage() + { + $message = m::mock(ProducerMessage::class); + + $result = dispatch($message); + + $this->assertInstanceOf(PendingAmqpProducerMessageDispatch::class, $result); + $this->assertSame($message, $this->getProperty($result, 'message')); + } + + public function testDispatchWithKafkaProduceMessage() + { + $message = new ProduceMessage('test-topic', 'test-value'); + + $result = dispatch($message); + + $this->assertInstanceOf(PendingKafkaProducerMessageDispatch::class, $result); + $this->assertSame($message, $this->getProperty($result, 'message')); + } + + public function testDispatchWithInvalidType() + { + $this->expectException(InvalidArgumentException::class); + $this->expectExceptionMessage('Unsupported job type.'); + + dispatch(new stdClass()); + } + + public function testPendingAsyncQueueDispatchOnPool() + { + $job = m::mock(JobInterface::class); + $pending = dispatch($job); + + $result = $pending->onPool('custom-pool'); + + $this->assertSame($pending, $result); + $this->assertEquals('custom-pool', $this->getProperty($pending, 'pool')); + } + + public function testPendingAsyncQueueDispatchDelay() + { + $job = m::mock(JobInterface::class); + $pending = dispatch($job); + + $result = $pending->delay(60); + + $this->assertSame($pending, $result); + $this->assertEquals(60, $this->getProperty($pending, 'delay')); + } + + public function testPendingAsyncQueueDispatchSetMaxAttempts() + { + $job = m::mock(JobInterface::class); + $job->shouldReceive('setMaxAttempts') + ->with(5) + ->once() + ->andReturnSelf(); + + $pending = dispatch($job); + + $result = $pending->setMaxAttempts(5); + + $this->assertSame($pending, $result); + } + + public function testPendingAsyncQueueDispatchFluentChaining() + { + $job = m::mock(JobInterface::class); + $job->shouldReceive('setMaxAttempts') + ->with(3) + ->once() + ->andReturnSelf(); + + $pending = dispatch($job) + ->onPool('high-priority') + ->delay(30) + ->setMaxAttempts(3); + + $this->assertEquals('high-priority', $this->getProperty($pending, 'pool')); + $this->assertEquals(30, $this->getProperty($pending, 'delay')); + } + + public function testPendingAsyncQueueDispatchExecutesOnDestruct() + { + $job = m::mock(JobInterface::class); + $pushed = false; + + $container = m::mock(ContainerInterface::class); + $driverFactory = m::mock(DriverFactory::class); + $driver = m::mock(Driver::class); + + $container->shouldReceive('get') + ->with(DriverFactory::class) + ->once() + ->andReturn($driverFactory); + + $driverFactory->shouldReceive('get') + ->with('test-pool') + ->once() + ->andReturn($driver); + + $driver->shouldReceive('push') + ->with($job, 10) + ->once() + ->andReturnUsing(function () use (&$pushed) { + $pushed = true; + return true; + }); + + ApplicationContext::setContainer($container); + + $pending = dispatch($job) + ->onPool('test-pool') + ->delay(10); + + // Trigger destruct + unset($pending); + + $this->assertTrue($pushed, 'Job should have been pushed to queue'); + } + + public function testPendingAsyncQueueDispatchWithConditionable() + { + $job = m::mock(JobInterface::class); + + $pending = dispatch($job) + ->when(true, function ($dispatch) { + $dispatch->onPool('conditional-pool'); + }) + ->unless(false, function ($dispatch) { + $dispatch->delay(20); + }); + + $this->assertEquals('conditional-pool', $this->getProperty($pending, 'pool')); + $this->assertEquals(20, $this->getProperty($pending, 'delay')); + } + + public function testPendingAmqpProducerMessageDispatchOnPool() + { + $message = m::mock(ProducerMessage::class); + $message->shouldReceive('setPoolName') + ->with('amqp-custom') + ->once() + ->andReturnSelf(); + + $pending = dispatch($message); + + $result = $pending->onPool('amqp-custom'); + + $this->assertSame($pending, $result); + $this->assertEquals('amqp-custom', $this->getProperty($pending, 'pool')); + } + + public function testPendingAmqpProducerMessageDispatchSetConfirm() + { + $message = m::mock(ProducerMessage::class); + $pending = dispatch($message); + + $result = $pending->setConfirm(true); + + $this->assertSame($pending, $result); + $this->assertTrue($this->getProperty($pending, 'confirm')); + } + + public function testPendingAmqpProducerMessageDispatchSetTimeout() + { + $message = m::mock(ProducerMessage::class); + $pending = dispatch($message); + + $result = $pending->setTimeout(10); + + $this->assertSame($pending, $result); + $this->assertEquals(10, $this->getProperty($pending, 'timeout')); + } + + public function testPendingAmqpProducerMessageDispatchFluentChaining() + { + $message = m::mock(ProducerMessage::class); + $message->shouldReceive('setPoolName') + ->with('amqp-pool') + ->once() + ->andReturnSelf(); + + $pending = dispatch($message) + ->onPool('amqp-pool') + ->setConfirm(true) + ->setTimeout(15); + + $this->assertEquals('amqp-pool', $this->getProperty($pending, 'pool')); + $this->assertTrue($this->getProperty($pending, 'confirm')); + $this->assertEquals(15, $this->getProperty($pending, 'timeout')); + } + + public function testPendingAmqpProducerMessageDispatchExecutesOnDestruct() + { + $message = m::mock(ProducerMessage::class); + $produced = false; + + $container = m::mock(ContainerInterface::class); + $producer = m::mock(Producer::class); + + $container->shouldReceive('get') + ->with(Producer::class) + ->once() + ->andReturn($producer); + + $producer->shouldReceive('produce') + ->with($message, true, 10) + ->once() + ->andReturnUsing(function () use (&$produced) { + $produced = true; + return true; + }); + + ApplicationContext::setContainer($container); + + $pending = dispatch($message) + ->setConfirm(true) + ->setTimeout(10); + + // Trigger destruct + unset($pending); + + $this->assertTrue($produced, 'Message should have been produced'); + } + + public function testPendingAmqpProducerMessageDispatchWithConditionable() + { + $message = m::mock(ProducerMessage::class); + + $pending = dispatch($message) + ->when(true, function ($dispatch) { + $dispatch->setConfirm(true); + }) + ->unless(false, function ($dispatch) { + $dispatch->setTimeout(20); + }); + + $this->assertTrue($this->getProperty($pending, 'confirm')); + $this->assertEquals(20, $this->getProperty($pending, 'timeout')); + } + + public function testPendingKafkaProducerMessageDispatchOnPool() + { + $message = new ProduceMessage('test-topic', 'test-value'); + $pending = dispatch($message); + + $result = $pending->onPool('kafka-custom'); + + $this->assertSame($pending, $result); + $this->assertEquals('kafka-custom', $this->getProperty($pending, 'pool')); + } + + public function testPendingKafkaProducerMessageDispatchWithHeader() + { + $message = new ProduceMessage('test-topic', 'test-value'); + $pending = dispatch($message); + + $result = $pending->withHeader('trace-id', '12345'); + + $this->assertSame($pending, $result); + + // Verify header was added to the message + $headers = $this->getProperty($message, 'headers'); + $this->assertIsArray($headers); + $this->assertNotEmpty($headers); + } + + public function testPendingKafkaProducerMessageDispatchFluentChaining() + { + $message = new ProduceMessage('test-topic', 'test-value'); + + $pending = dispatch($message) + ->onPool('kafka-pool') + ->withHeader('user-id', '123') + ->withHeader('request-id', 'abc'); + + $this->assertEquals('kafka-pool', $this->getProperty($pending, 'pool')); + + // Verify both headers were added + $headers = $this->getProperty($message, 'headers'); + $this->assertIsArray($headers); + $this->assertCount(2, $headers); + } + + public function testPendingKafkaProducerMessageDispatchExecutesOnDestruct() + { + $message = new ProduceMessage('test-topic', 'test-value'); + $sent = false; + + $container = m::mock(ContainerInterface::class); + $producerManager = m::mock(ProducerManager::class); + $kafkaProducer = m::mock(HyperfKafkaProducer::class); + + $container->shouldReceive('get') + ->with(ProducerManager::class) + ->once() + ->andReturn($producerManager); + + $producerManager->shouldReceive('getProducer') + ->with('kafka-pool') + ->once() + ->andReturn($kafkaProducer); + + $kafkaProducer->shouldReceive('sendBatch') + ->with([$message]) + ->once() + ->andReturnUsing(function () use (&$sent) { + $sent = true; + return true; + }); + + ApplicationContext::setContainer($container); + + $pending = dispatch($message) + ->onPool('kafka-pool'); + + // Trigger destruct + unset($pending); + + $this->assertTrue($sent, 'Message should have been sent'); + } + + public function testPendingKafkaProducerMessageDispatchWithConditionable() + { + $message = new ProduceMessage('test-topic', 'test-value'); + + $pending = dispatch($message) + ->when(true, function ($dispatch) { + $dispatch->withHeader('conditional', 'true'); + }) + ->unless(false, function ($dispatch) { + $dispatch->onPool('conditional-pool'); + }); + + $this->assertEquals('conditional-pool', $this->getProperty($pending, 'pool')); + + $headers = $this->getProperty($message, 'headers'); + $this->assertNotEmpty($headers); + } + + public function testBackwardCompatibilityWithBasicDispatch() + { + $job = m::mock(JobInterface::class); + + $container = m::mock(ContainerInterface::class); + $driverFactory = m::mock(DriverFactory::class); + $driver = m::mock(Driver::class); + + $container->shouldReceive('get') + ->with(DriverFactory::class) + ->once() + ->andReturn($driverFactory); + + $driverFactory->shouldReceive('get') + ->with('default') + ->once() + ->andReturn($driver); + + $driver->shouldReceive('push') + ->with($job, 0) + ->once() + ->andReturn(true); + + ApplicationContext::setContainer($container); + + // Test basic dispatch without any configuration + $pending = dispatch($job); + + // Verify defaults + $this->assertEquals('default', $this->getProperty($pending, 'pool')); + $this->assertEquals(0, $this->getProperty($pending, 'delay')); + + // Trigger destruct + unset($pending); + } + + public function testDispatchWithErrorHandling() + { + $job = m::mock(JobInterface::class); + + $container = m::mock(ContainerInterface::class); + $driverFactory = m::mock(DriverFactory::class); + + $container->shouldReceive('get') + ->with(DriverFactory::class) + ->once() + ->andReturn($driverFactory); + + $driverFactory->shouldReceive('get') + ->with('default') + ->once() + ->andThrow(new Exception('Driver not found')); + + ApplicationContext::setContainer($container); + + $this->expectException(Exception::class); + $this->expectExceptionMessage('Driver not found'); + + $pending = dispatch($job); + + // Trigger destruct which should throw + unset($pending); + } + + protected function setupDefaultContainerMock(): void + { + $container = m::mock(ContainerInterface::class); + $driverFactory = m::mock(DriverFactory::class); + $driver = m::mock(Driver::class); + $producer = m::mock(Producer::class); + $producerManager = m::mock(ProducerManager::class); + $kafkaProducer = m::mock(HyperfKafkaProducer::class); + + // Setup for AsyncQueue dispatch + $container->shouldReceive('get') + ->with(DriverFactory::class) + ->zeroOrMoreTimes() + ->andReturn($driverFactory); + + $driverFactory->shouldReceive('get') + ->withAnyArgs() + ->zeroOrMoreTimes() + ->andReturn($driver); + + $driver->shouldReceive('push') + ->zeroOrMoreTimes() + ->andReturnTrue(); + + // Setup for AMQP dispatch + $container->shouldReceive('get') + ->with(Producer::class) + ->zeroOrMoreTimes() + ->andReturn($producer); + + $producer->shouldReceive('produce') + ->zeroOrMoreTimes() + ->andReturnTrue(); + + // Setup for Kafka dispatch + $container->shouldReceive('get') + ->with(ProducerManager::class) + ->zeroOrMoreTimes() + ->andReturn($producerManager); + + $producerManager->shouldReceive('getProducer') + ->withAnyArgs() + ->zeroOrMoreTimes() + ->andReturn($kafkaProducer); + + $kafkaProducer->shouldReceive('sendBatch') + ->zeroOrMoreTimes() + ->andReturnTrue(); + + ApplicationContext::setContainer($container); + } + + /** + * Helper method to get protected/private property value. + */ + protected function getProperty(object $object, string $property): mixed + { + $reflection = new ReflectionClass($object); + $prop = $reflection->getProperty($property); + $prop->setAccessible(true); + + return $prop->getValue($object); + } +} From 95b541d684abe6138e790ffd680af215d74bd1f3 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Fri, 14 Nov 2025 07:54:29 +0800 Subject: [PATCH 2/3] feat: add dispatch() function tests in Support and Functions helpers --- types/Helpers/Functions.php | 25 +------------------------ types/Support/Support.php | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/types/Helpers/Functions.php b/types/Helpers/Functions.php index 61de7da8e..70f833645 100644 --- a/types/Helpers/Functions.php +++ b/types/Helpers/Functions.php @@ -20,7 +20,6 @@ use function FriendsOfHyperf\Helpers\Command\call; use function FriendsOfHyperf\Helpers\cookie; use function FriendsOfHyperf\Helpers\di; -use function FriendsOfHyperf\Helpers\dispatch; use function FriendsOfHyperf\Helpers\enum_value; use function FriendsOfHyperf\Helpers\environment; use function FriendsOfHyperf\Helpers\event; @@ -80,7 +79,7 @@ // event() tests $testEvent = new class {}; -assertType('AnonymousClass7af6ae4c28737d2b2877adcdeb4da107', event($testEvent)); +assertType('AnonymousClass876f9ee1df9515d53da4de3de3377dd1', event($testEvent)); // filled() tests assertType('bool', filled(null)); @@ -152,28 +151,6 @@ assertType('Hyperf\HttpMessage\Cookie\CookieJarInterface', cookie()); assertType('Hyperf\HttpMessage\Cookie\Cookie', cookie('name', 'value')); -// dispatch() tests - returns bool -// Note: dispatch() has complex return types based on job type, testing the common case -assertType('bool', dispatch(new class implements Hyperf\AsyncQueue\JobInterface { - public function handle(): void - { - } - - public function fail(Throwable $e): void - { - } - - public function getMaxAttempts(): int - { - return 0; - } - - public function setMaxAttempts(int $maxAttempts): static - { - return $this; - } -})); - // environment() tests assertType('bool|FriendsOfHyperf\Support\Environment', environment()); assertType('bool|FriendsOfHyperf\Support\Environment', environment('production')); diff --git a/types/Support/Support.php b/types/Support/Support.php index 021536ce6..fa9c1b160 100644 --- a/types/Support/Support.php +++ b/types/Support/Support.php @@ -18,6 +18,7 @@ use FriendsOfHyperf\Support\Sleep; use FriendsOfHyperf\Support\Timebox; +use function FriendsOfHyperf\Support\dispatch; use function FriendsOfHyperf\Support\once; use function PHPStan\Testing\assertType; @@ -61,3 +62,25 @@ $command = new RedisCommand('SET', ['key', 'value']); assertType('string', (string) $command); + +assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(new class implements Hyperf\AsyncQueue\JobInterface { + public function handle(): void + { + } + + public function fail(Throwable $e): void + { + } + + public function getMaxAttempts(): int + { + return 0; + } + + public function setMaxAttempts(int $maxAttempts): static + { + return $this; + } +})); + +assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn () => null)); From beb8d274413ce388e176a9c718751b7f086e4099 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Fri, 14 Nov 2025 07:57:45 +0800 Subject: [PATCH 3/3] fix: update docblock for dispatch() function to clarify parameter type --- src/support/src/Functions.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/support/src/Functions.php b/src/support/src/Functions.php index e5ab28da5..0ab295a0b 100644 --- a/src/support/src/Functions.php +++ b/src/support/src/Functions.php @@ -27,6 +27,7 @@ /** * Do not assign a value to the return value of this function unless you are very clear about the consequences of doing so. * @param Closure|JobInterface|ProduceMessage|ProducerMessageInterface|mixed $job + * @param-closure-this ($job is Closure ? CallQueuedClosure : mixed) $job * @return ($job is Closure ? PendingAsyncQueueDispatch : ($job is JobInterface ? PendingAsyncQueueDispatch : ($job is ProducerMessageInterface ? PendingAmqpProducerMessageDispatch : PendingKafkaProducerMessageDispatch))) * @throws InvalidArgumentException */