From 78eb9480e66795a5239cb65078fd4163745763e4 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Sat, 15 Nov 2025 08:50:08 +0800 Subject: [PATCH 1/3] refactor: consolidate CallQueuedClosure to support package This refactoring consolidates the async queue closure functionality into the support package, eliminating circular dependencies and simplifying the package structure. Changes: - Move CallQueuedClosure and ClosureParameterInjection from async-queue-closure-job to support package - Update async-queue-closure-job to depend on support package instead - Remove duplicate dependencies from async-queue-closure-job - Add laravel/serializable-closure dependency to support package - Update all references to use the new location - Remove tests from async-queue-closure-job (tests will be maintained in support package) - Add setExchange() and setRoutingKey() methods to PendingAmqpProducerMessageDispatch - Change property visibility from public to protected in dispatch classes for better encapsulation --- src/async-queue-closure-job/composer.json | 5 - .../src/ConfigProvider.php | 11 +- src/async-queue-closure-job/src/Functions.php | 7 +- src/helpers/src/Functions.php | 2 +- src/support/composer.json | 2 +- src/support/src/AsyncQueue/ClosureJob.php | 3 +- .../PendingAmqpProducerMessageDispatch.php | 24 +- .../PendingKafkaProducerMessageDispatch.php | 2 +- .../src/CallQueuedClosure.php | 2 +- src/support/src/Functions.php | 1 - .../src/Traits/ClosureParameterInjection.php | 2 +- .../BasicFunctionalityTest.php | 240 --------------- .../CallQueuedClosureCreateTest.php | 279 ------------------ tests/AsyncQueueClosureJob/ClosureJobTest.php | 261 ---------------- types/AsyncQueueClosureJob/Functions.php | 64 ---- 15 files changed, 33 insertions(+), 872 deletions(-) rename src/{async-queue-closure-job => support}/src/CallQueuedClosure.php (96%) rename src/{async-queue-closure-job => support}/src/Traits/ClosureParameterInjection.php (97%) delete mode 100644 tests/AsyncQueueClosureJob/BasicFunctionalityTest.php delete mode 100644 tests/AsyncQueueClosureJob/CallQueuedClosureCreateTest.php delete mode 100644 tests/AsyncQueueClosureJob/ClosureJobTest.php delete mode 100644 types/AsyncQueueClosureJob/Functions.php diff --git a/src/async-queue-closure-job/composer.json b/src/async-queue-closure-job/composer.json index b1db19303..09950d2af 100644 --- a/src/async-queue-closure-job/composer.json +++ b/src/async-queue-closure-job/composer.json @@ -20,12 +20,7 @@ "pull-request": "https://github.com/friendsofhyperf/components/pulls" }, "require": { - "hyperf/async-queue": "~3.1.0", - "hyperf/conditionable": "~3.1.0", - "hyperf/context": "~3.1.0", - "hyperf/di": "~3.1.0", "hyperf/support": "~3.1.72", - "laravel/serializable-closure": "^1.0 || ^2.0" }, "minimum-stability": "dev", "autoload": { diff --git a/src/async-queue-closure-job/src/ConfigProvider.php b/src/async-queue-closure-job/src/ConfigProvider.php index 9a8d4444c..84a276160 100644 --- a/src/async-queue-closure-job/src/ConfigProvider.php +++ b/src/async-queue-closure-job/src/ConfigProvider.php @@ -15,15 +15,6 @@ final class ConfigProvider { public function __invoke() { - return [ - 'dependencies' => [], - 'annotations' => [ - 'scan' => [ - 'paths' => [ - __DIR__, - ], - ], - ], - ]; + return []; } } diff --git a/src/async-queue-closure-job/src/Functions.php b/src/async-queue-closure-job/src/Functions.php index 2c8084fc9..5f5b82359 100644 --- a/src/async-queue-closure-job/src/Functions.php +++ b/src/async-queue-closure-job/src/Functions.php @@ -13,6 +13,9 @@ use Closure; use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch; +use FriendsOfHyperf\Support\CallQueuedClosure; + +use function FriendsOfHyperf\Support\dispatch as base_dispatch; /** * Dispatch a closure as an async queue job. @@ -22,7 +25,5 @@ */ function dispatch(Closure $closure): PendingAsyncQueueDispatch { - return new PendingAsyncQueueDispatch( - CallQueuedClosure::create($closure) - ); + return base_dispatch($closure); } diff --git a/src/helpers/src/Functions.php b/src/helpers/src/Functions.php index 972b84151..bdeeec175 100644 --- a/src/helpers/src/Functions.php +++ b/src/helpers/src/Functions.php @@ -17,9 +17,9 @@ use Countable; use DateTimeZone; use Exception; -use FriendsOfHyperf\AsyncQueueClosureJob\CallQueuedClosure; use FriendsOfHyperf\AsyncTask\Task as AsyncTask; use FriendsOfHyperf\AsyncTask\TaskInterface as AsyncTaskInterface; +use FriendsOfHyperf\Support\CallQueuedClosure; use FriendsOfHyperf\Support\Environment; use Hyperf\Amqp\Message\ProducerMessageInterface; use Hyperf\Amqp\Producer; diff --git a/src/support/composer.json b/src/support/composer.json index df784ad43..056d2c3c4 100644 --- a/src/support/composer.json +++ b/src/support/composer.json @@ -20,7 +20,6 @@ "pull-request": "https://github.com/friendsofhyperf/components/pulls" }, "require": { - "friendsofhyperf/async-queue-closure-job": "~3.1.0", "hyperf/collection": "~3.1.0", "hyperf/context": "~3.1.0", "hyperf/di": "~3.1.0", @@ -29,6 +28,7 @@ "hyperf/stringable": "~3.1.0", "hyperf/support": "~3.1.0", "hyperf/tappable": "~3.1.0", + "laravel/serializable-closure": "^1.0 || ^2.0", "nesbot/carbon": "^2.0 || ^3.0", "symfony/polyfill-php85": "^1.33" }, diff --git a/src/support/src/AsyncQueue/ClosureJob.php b/src/support/src/AsyncQueue/ClosureJob.php index 1921fe770..3add347ef 100644 --- a/src/support/src/AsyncQueue/ClosureJob.php +++ b/src/support/src/AsyncQueue/ClosureJob.php @@ -12,12 +12,13 @@ namespace FriendsOfHyperf\Support\AsyncQueue; use Closure; +use FriendsOfHyperf\Support\CallQueuedClosure; use Laravel\SerializableClosure\SerializableClosure; /** * @deprecated since v3.2, will be removed in v3.2, use `FriendsOfHyperf\AsyncQueueClosureJob\ClosureJob` instead. */ -class ClosureJob extends \FriendsOfHyperf\AsyncQueueClosureJob\CallQueuedClosure +class ClosureJob extends CallQueuedClosure { public function __construct(Closure $closure, int $maxAttempts = 0) { diff --git a/src/support/src/Bus/PendingAmqpProducerMessageDispatch.php b/src/support/src/Bus/PendingAmqpProducerMessageDispatch.php index df74e679e..d2eeb6c50 100644 --- a/src/support/src/Bus/PendingAmqpProducerMessageDispatch.php +++ b/src/support/src/Bus/PendingAmqpProducerMessageDispatch.php @@ -23,11 +23,15 @@ class PendingAmqpProducerMessageDispatch { use Conditionable; - public ?string $pool = null; + protected ?string $pool = null; - public int $timeout = 5; + protected int $timeout = 5; - public bool $confirm = false; + protected bool $confirm = false; + + protected null|array|string $routingKey = null; + + protected ?string $exchange = null; public function __construct(protected ProducerMessageInterface $message) { @@ -36,6 +40,8 @@ public function __construct(protected ProducerMessageInterface $message) public function __destruct() { $this->pool && $this->message->setPoolName($this->pool); + $this->routingKey && $this->message->setRoutingKey($this->routingKey); + $this->exchange && $this->message->setExchange($this->exchange); ApplicationContext::getContainer() ->get(Producer::class) ->produce($this->message, $this->confirm, $this->timeout); @@ -62,6 +68,18 @@ public function withHeader(string $key, mixed $value, ?int $ttl = null): static return $this; } + public function setExchange(string $exchange): static + { + $this->exchange = $exchange; + return $this; + } + + public function setRoutingKey(array|string $routingKey): static + { + $this->routingKey = $routingKey; + return $this; + } + public function setConfirm(bool $confirm): static { $this->confirm = $confirm; diff --git a/src/support/src/Bus/PendingKafkaProducerMessageDispatch.php b/src/support/src/Bus/PendingKafkaProducerMessageDispatch.php index 13671f111..7a4bb29b6 100644 --- a/src/support/src/Bus/PendingKafkaProducerMessageDispatch.php +++ b/src/support/src/Bus/PendingKafkaProducerMessageDispatch.php @@ -26,7 +26,7 @@ class PendingKafkaProducerMessageDispatch { use Conditionable; - public ?string $pool = null; + protected ?string $pool = null; public function __construct(protected ProduceMessage $message) { diff --git a/src/async-queue-closure-job/src/CallQueuedClosure.php b/src/support/src/CallQueuedClosure.php similarity index 96% rename from src/async-queue-closure-job/src/CallQueuedClosure.php rename to src/support/src/CallQueuedClosure.php index fadc2adbd..fdbdbdf56 100644 --- a/src/async-queue-closure-job/src/CallQueuedClosure.php +++ b/src/support/src/CallQueuedClosure.php @@ -9,7 +9,7 @@ * @contact huangdijia@gmail.com */ -namespace FriendsOfHyperf\AsyncQueueClosureJob; +namespace FriendsOfHyperf\Support; use Closure; use Hyperf\AsyncQueue\Job; diff --git a/src/support/src/Functions.php b/src/support/src/Functions.php index 0ab295a0b..1648cf257 100644 --- a/src/support/src/Functions.php +++ b/src/support/src/Functions.php @@ -13,7 +13,6 @@ use Closure; use Exception; -use FriendsOfHyperf\AsyncQueueClosureJob\CallQueuedClosure; use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch; use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch; use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch; diff --git a/src/async-queue-closure-job/src/Traits/ClosureParameterInjection.php b/src/support/src/Traits/ClosureParameterInjection.php similarity index 97% rename from src/async-queue-closure-job/src/Traits/ClosureParameterInjection.php rename to src/support/src/Traits/ClosureParameterInjection.php index 0f1f2f37c..3ddf18d81 100644 --- a/src/async-queue-closure-job/src/Traits/ClosureParameterInjection.php +++ b/src/support/src/Traits/ClosureParameterInjection.php @@ -9,7 +9,7 @@ * @contact huangdijia@gmail.com */ -namespace FriendsOfHyperf\AsyncQueueClosureJob\Traits; +namespace FriendsOfHyperf\Support\Traits; use Closure; use Hyperf\Context\ApplicationContext; diff --git a/tests/AsyncQueueClosureJob/BasicFunctionalityTest.php b/tests/AsyncQueueClosureJob/BasicFunctionalityTest.php deleted file mode 100644 index 43d61ac47..000000000 --- a/tests/AsyncQueueClosureJob/BasicFunctionalityTest.php +++ /dev/null @@ -1,240 +0,0 @@ -setMaxAttempts(5); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(5, $job->getMaxAttempts()); - } - - public function testCallQueuedClosureCreateWithDefaultMaxAttempts() - { - $closure = function () { - return 'test'; - }; - - $job = CallQueuedClosure::create($closure); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(0, $job->getMaxAttempts()); - } - - public function testCallQueuedClosureCreateWithZeroMaxAttempts() - { - $closure = function () { - return 'test'; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(0); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(0, $job->getMaxAttempts()); - } - - public function testCallQueuedClosureCreateWithNegativeMaxAttempts() - { - $closure = function () { - return 'test'; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(-1); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(-1, $job->getMaxAttempts()); - } - - public function testCallQueuedClosureJobExecution() - { - $executed = false; - - $job = CallQueuedClosure::create(function () use (&$executed) { - $executed = true; - return 'executed'; - }); - $job->setMaxAttempts(3); - - $this->assertEquals(3, $job->getMaxAttempts()); - - // Mock container for execution - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - $result = $job->handle(); - - $this->assertTrue($executed); - $this->assertEquals('executed', $result); - } - - public function testCallQueuedClosureWithReturnValue() - { - $job = CallQueuedClosure::create(function () { - return 'return value'; - }); - $job->setMaxAttempts(2); - - // Mock container for execution - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - $result = $job->handle(); - - $this->assertEquals('return value', $result); - $this->assertEquals(2, $job->getMaxAttempts()); - } - - public function testCallQueuedClosureWithParameters() - { - $receivedParam = null; - - $job = CallQueuedClosure::create(function ($param = 'default') use (&$receivedParam) { - $receivedParam = $param; - }); - $job->setMaxAttempts(4); - - // Mock container for execution - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - $job->handle(); - - $this->assertEquals('default', $receivedParam); - $this->assertEquals(4, $job->getMaxAttempts()); - } - - public function testCallQueuedClosureSerialization() - { - $capturedValue = 'captured for serialization'; - - $job = CallQueuedClosure::create(function () use ($capturedValue) { - return $capturedValue; - }); - $job->setMaxAttempts(6); - - $this->assertEquals(6, $job->getMaxAttempts()); - - // Test serialization - $serialized = serialize($job); - $this->assertIsString($serialized); - - $unserialized = unserialize($serialized); - $this->assertInstanceOf(CallQueuedClosure::class, $unserialized); - $this->assertEquals(6, $unserialized->getMaxAttempts()); - - // Mock container for execution - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - // Execute the unserialized job - $result = $unserialized->handle(); - - $this->assertEquals('captured for serialization', $result); - } - - public function testCallQueuedClosureMethodProperty() - { - $job = CallQueuedClosure::create(function () { - return 'method property test'; - }); - $job->setMaxAttempts(8); - - $this->assertEquals(8, $job->getMaxAttempts()); - $this->assertStringContainsString('BasicFunctionalityTest.php', $job->method); - $this->assertStringContainsString(':', $job->method); - } - - public function testMultipleCreateCallsWithDifferentMaxAttempts() - { - $closure = function () { return 'test'; }; - - $job1 = CallQueuedClosure::create($closure); - $job1->setMaxAttempts(1); - $job2 = CallQueuedClosure::create($closure); - $job2->setMaxAttempts(5); - $job3 = CallQueuedClosure::create($closure); - $job3->setMaxAttempts(10); - $job4 = CallQueuedClosure::create($closure); - $job4->setMaxAttempts(0); - $job5 = CallQueuedClosure::create($closure); - $job5->setMaxAttempts(-1); - - $this->assertEquals(1, $job1->getMaxAttempts()); - $this->assertEquals(5, $job2->getMaxAttempts()); - $this->assertEquals(10, $job3->getMaxAttempts()); - $this->assertEquals(0, $job4->getMaxAttempts()); - $this->assertEquals(-1, $job5->getMaxAttempts()); - - $this->assertNotSame($job1, $job2); - $this->assertNotSame($job2, $job3); - $this->assertNotSame($job1, $job3); - } - - // Note: Dispatch function tests are omitted due to container configuration requirements - // The functionality is tested through PendingAsyncQueueDispatch direct instantiation - - /** - * Helper method to get protected/private property value. - */ - protected function getProperty(object $object, string $property): mixed - { - $reflection = new ReflectionClass($object); - $property = $reflection->getProperty($property); - $property->setAccessible(true); - - return $property->getValue($object); - } -} diff --git a/tests/AsyncQueueClosureJob/CallQueuedClosureCreateTest.php b/tests/AsyncQueueClosureJob/CallQueuedClosureCreateTest.php deleted file mode 100644 index a5111648b..000000000 --- a/tests/AsyncQueueClosureJob/CallQueuedClosureCreateTest.php +++ /dev/null @@ -1,279 +0,0 @@ -assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(0, $job->getMaxAttempts()); - } - - public function testCreateMethodWithCustomMaxAttempts() - { - $closure = function () { - return 'custom max attempts'; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(7); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(7, $job->getMaxAttempts()); - } - - public function testCreateMethodWithZeroMaxAttempts() - { - $closure = function () { - return 'zero max attempts'; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(0); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(0, $job->getMaxAttempts()); - } - - public function testCreateMethodWithNegativeMaxAttempts() - { - $closure = function () { - return 'negative max attempts'; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(-1); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(-1, $job->getMaxAttempts()); - } - - public function testCreateMethodWithLargeMaxAttempts() - { - $closure = function () { - return 'large max attempts'; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(999); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(999, $job->getMaxAttempts()); - } - - public function testCreateMethodPreservesClosureFunctionality() - { - $testValue = null; - $closure = function () use (&$testValue) { - $testValue = 'closure executed'; - return 'result'; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(5); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(5, $job->getMaxAttempts()); - - // Mock the container - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - // Execute the job - $result = $job->handle(); - - $this->assertEquals('closure executed', $testValue); - $this->assertEquals('result', $result); - } - - public function testCreateMethodWithParameters() - { - $receivedParam = null; - $closure = function ($param = 'default') use (&$receivedParam) { - $receivedParam = $param; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(2); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(2, $job->getMaxAttempts()); - - // Mock the container - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - // Execute the job - $job->handle(); - - $this->assertEquals('default', $receivedParam); - } - - public function testCreateMethodWithDependencyInjection() - { - $executed = false; - $test = $this; - - $closure = function (ContainerInterface $container) use (&$executed, $test) { - $executed = true; - $test->assertInstanceOf(ContainerInterface::class, $container); - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(4); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(4, $job->getMaxAttempts()); - - // Mock the container with DI support - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(true); - - $definitionCollector = m::mock(ClosureDefinitionCollectorInterface::class); - $definition = m::mock(); - $definition->shouldReceive('getMeta') - ->with('name') - ->andReturn('container'); - $definition->shouldReceive('getName') - ->andReturn(ContainerInterface::class); - $definition->shouldReceive('getMeta') - ->with('defaultValueAvailable') - ->andReturn(false); - $definition->shouldReceive('allowsNull') - ->andReturn(false); - - $definitionCollector->shouldReceive('getParameters') - ->andReturn([0 => $definition]); - - $container->shouldReceive('get') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn($definitionCollector); - $container->shouldReceive('has') - ->with(ContainerInterface::class) - ->andReturn(true); - $container->shouldReceive('get') - ->with(ContainerInterface::class) - ->andReturn($container); - - ApplicationContext::setContainer($container); - - $job->handle(); - - $this->assertTrue($executed); - } - - public function testCreateMethodSerializesCorrectly() - { - $capturedValue = 'captured for serialization'; - $closure = function () use ($capturedValue) { - return $capturedValue; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(6); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(6, $job->getMaxAttempts()); - - // Test serialization - $serialized = serialize($job); - $this->assertIsString($serialized); - - $unserialized = unserialize($serialized); - $this->assertInstanceOf(CallQueuedClosure::class, $unserialized); - $this->assertEquals(6, $unserialized->getMaxAttempts()); - - // Mock the container for execution - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - // Execute the unserialized job - $result = $unserialized->handle(); - - $this->assertEquals('captured for serialization', $result); - } - - public function testCreateMethodMethodProperty() - { - $closure = function () { - return 'method property test'; - }; - - $job = CallQueuedClosure::create($closure); - $job->setMaxAttempts(8); - - $this->assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals(8, $job->getMaxAttempts()); - $this->assertStringContainsString('CallQueuedClosureCreateTest.php', $job->method); - $this->assertStringContainsString(':', $job->method); - } - - public function testMultipleCreateCallsWithDifferentMaxAttempts() - { - $closure1 = function () { return 'first'; }; - $closure2 = function () { return 'second'; }; - $closure3 = function () { return 'third'; }; - - $job1 = CallQueuedClosure::create($closure1); - $job1->setMaxAttempts(1); - $job2 = CallQueuedClosure::create($closure2); - $job2->setMaxAttempts(5); - $job3 = CallQueuedClosure::create($closure3); - $job3->setMaxAttempts(10); - - $this->assertEquals(1, $job1->getMaxAttempts()); - $this->assertEquals(5, $job2->getMaxAttempts()); - $this->assertEquals(10, $job3->getMaxAttempts()); - - $this->assertNotSame($job1, $job2); - $this->assertNotSame($job2, $job3); - $this->assertNotSame($job1, $job3); - } -} diff --git a/tests/AsyncQueueClosureJob/ClosureJobTest.php b/tests/AsyncQueueClosureJob/ClosureJobTest.php deleted file mode 100644 index 9fafacd44..000000000 --- a/tests/AsyncQueueClosureJob/ClosureJobTest.php +++ /dev/null @@ -1,261 +0,0 @@ -assertInstanceOf(CallQueuedClosure::class, $job); - $this->assertEquals('Closure', $job->class); - $this->assertIsString($job->method); - } - - public function testClosureJobCanBeCreatedWithMaxAttempts() - { - $job = CallQueuedClosure::create(function () { - return 'test'; - }); - $job->setMaxAttempts(3); - - $this->assertEquals(3, $job->getMaxAttempts()); - } - - public function testClosureJobCanHandleSimpleClosure() - { - $executed = false; - - $job = CallQueuedClosure::create(function () use (&$executed) { - $executed = true; - }); - - // Mock the container - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - $job->handle(); - - $this->assertTrue($executed); - } - - public function testClosureJobCanHandleClosureWithReturnValue() - { - $result = null; - - $job = CallQueuedClosure::create(function () use (&$result) { - $result = 'success'; - return $result; - }); - - // Mock the container - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - $job->handle(); - - $this->assertEquals('success', $result); - } - - public function testClosureJobCanHandleClosureWithParameters() - { - $value = null; - - $job = CallQueuedClosure::create(function ($param = 'default') use (&$value) { - $value = $param; - }); - - // Mock the container - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - $job->handle(); - - $this->assertEquals('default', $value); - } - - public function testClosureJobMethodContainsFileAndLine() - { - $job = CallQueuedClosure::create(function () { - return 'test'; - }); - - $this->assertStringContainsString(':', $job->method); - $this->assertStringContainsString('ClosureJobTest.php', $job->method); - } - - public function testClosureJobWithDependencyInjection() - { - $executed = false; - $test = $this; - - $job = CallQueuedClosure::create(function (ContainerInterface $container) use (&$executed, $test) { - $executed = true; - $test->assertInstanceOf(ContainerInterface::class, $container); - }); - - // Mock the container with DI support - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(true); - - $definitionCollector = m::mock(ClosureDefinitionCollectorInterface::class); - $definition = m::mock(); - $definition->shouldReceive('getMeta') - ->with('name') - ->andReturn('container'); - $definition->shouldReceive('getName') - ->andReturn(ContainerInterface::class); - $definition->shouldReceive('getMeta') - ->with('defaultValueAvailable') - ->andReturn(false); - $definition->shouldReceive('allowsNull') - ->andReturn(false); - - $definitionCollector->shouldReceive('getParameters') - ->andReturn([0 => $definition]); - - $container->shouldReceive('get') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn($definitionCollector); - $container->shouldReceive('has') - ->with(ContainerInterface::class) - ->andReturn(true); - $container->shouldReceive('get') - ->with(ContainerInterface::class) - ->andReturn($container); - - ApplicationContext::setContainer($container); - - $job->handle(); - - $this->assertTrue($executed); - } - - public function testClosureJobSerializesCorrectly() - { - $job = CallQueuedClosure::create(function () { - return 'test'; - }); - - $serialized = serialize($job); - $this->assertIsString($serialized); - - $unserialized = unserialize($serialized); - $this->assertInstanceOf(CallQueuedClosure::class, $unserialized); - $this->assertEquals($job->class, $unserialized->class); - } - - public function testClosureJobCanCaptureVariables() - { - $captured = 'captured value'; - $result = null; - - $job = CallQueuedClosure::create(function () use ($captured, &$result) { - $result = $captured; - }); - - // Mock the container - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(false); - - ApplicationContext::setContainer($container); - - $job->handle(); - - $this->assertEquals('captured value', $result); - } - - public function testClosureJobMaxAttemptsDefaultsToZero() - { - $job = CallQueuedClosure::create(function () { - return 'test'; - }); - - $this->assertEquals(0, $job->getMaxAttempts()); - } - - public function testClosureJobWithNullableParameter() - { - $value = 'not null'; - - $job = CallQueuedClosure::create(function (?string $param = null) use (&$value) { - $value = $param ?? 'was null'; - }); - - // Mock the container with nullable parameter support - $container = m::mock(ContainerInterface::class); - $container->shouldReceive('has') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn(true); - - $definitionCollector = m::mock(ClosureDefinitionCollectorInterface::class); - $definition = m::mock(); - $definition->shouldReceive('getMeta') - ->with('name') - ->andReturn('param'); - $definition->shouldReceive('getMeta') - ->with('defaultValueAvailable') - ->andReturn(true); - $definition->shouldReceive('getMeta') - ->with('defaultValue') - ->andReturn(null); - - $definitionCollector->shouldReceive('getParameters') - ->andReturn([0 => $definition]); - - $container->shouldReceive('get') - ->with(ClosureDefinitionCollectorInterface::class) - ->andReturn($definitionCollector); - - ApplicationContext::setContainer($container); - - $job->handle(); - - $this->assertEquals('was null', $value); - } -} diff --git a/types/AsyncQueueClosureJob/Functions.php b/types/AsyncQueueClosureJob/Functions.php deleted file mode 100644 index 11eca253a..000000000 --- a/types/AsyncQueueClosureJob/Functions.php +++ /dev/null @@ -1,64 +0,0 @@ - 'result')); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(function () { - return 'test'; -})); - -// Method chaining tests - setMaxAttempts() -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn () => true)->setMaxAttempts(3)); - -// Method chaining tests - onPool() -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn () => null)->onPool('default')); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn () => [])->onPool('custom-pool')); - -// Method chaining tests - delay() -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn () => 123)->delay(10)); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn () => new stdClass())->delay(0)); - -// Complex method chaining -assertType( - 'FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', - dispatch(fn () => 'multi-chain') - ->setMaxAttempts(5) - ->onPool('worker') - ->delay(30) -); - -assertType( - 'FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', - dispatch(function () { - return ['key' => 'value']; - }) - ->delay(60) - ->setMaxAttempts(3) -); - -// Different closure return types -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn (): string => 'typed return')); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn (): int => 42)); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn (): array => [])); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn (): bool => true)); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn (): ?string => null)); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(function (): void { - // void return type -})); - -// Closures with parameters -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn ($param) => $param)); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(fn (string $name, int $age) => "{$name} is {$age}")); -assertType('FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch', dispatch(function ($a, $b) { - return $a + $b; -})); From f46cadee1849565daee588b276a61ad930d5c953 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Sat, 15 Nov 2025 08:53:17 +0800 Subject: [PATCH 2/3] fix: update hyperf/support version to ~3.1.73 in composer.json --- src/async-queue-closure-job/composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/async-queue-closure-job/composer.json b/src/async-queue-closure-job/composer.json index 09950d2af..15319ea2b 100644 --- a/src/async-queue-closure-job/composer.json +++ b/src/async-queue-closure-job/composer.json @@ -20,7 +20,7 @@ "pull-request": "https://github.com/friendsofhyperf/components/pulls" }, "require": { - "hyperf/support": "~3.1.72", + "hyperf/support": "~3.1.73" }, "minimum-stability": "dev", "autoload": { From 47c709e66c3696ab435891c3cc005d81d60193f3 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Sat, 15 Nov 2025 08:59:23 +0800 Subject: [PATCH 3/3] refactor: update README and deprecate dispatch function in async-queue-closure-job package --- src/async-queue-closure-job/README.md | 35 ++- src/async-queue-closure-job/src/Functions.php | 2 + src/support/README.md | 206 +++++++++++++++++- tests/Support/DispatchTest.php | 2 +- 4 files changed, 237 insertions(+), 8 deletions(-) diff --git a/src/async-queue-closure-job/README.md b/src/async-queue-closure-job/README.md index 74a06a4fb..de3035711 100644 --- a/src/async-queue-closure-job/README.md +++ b/src/async-queue-closure-job/README.md @@ -6,6 +6,8 @@ The async queue closure job component for Hyperf. Execute closures as background jobs with full support for dependency injection, fluent configuration. +> **Note**: Starting from v3.1.73, this package serves as a convenience wrapper around the `friendsofhyperf/support` package. The core implementation (`CallQueuedClosure` and related classes) has been moved to the support package to eliminate circular dependencies and improve package architecture. + ## Installation ```shell @@ -109,14 +111,39 @@ The main dispatch function that creates a closure job. - Closures with captured variables (`use`) - Closures with nullable parameters -## Testing +## Package Architecture -Run tests: +As of v3.1.73, this package has been refactored to improve the overall architecture: -```shell -composer test:unit -- tests/AsyncQueueClosureJob +- **Core Implementation**: Moved to `friendsofhyperf/support` package + - `FriendsOfHyperf\Support\CallQueuedClosure` + - `FriendsOfHyperf\Support\Traits\ClosureParameterInjection` +- **Convenience Layer**: This package now provides a namespace alias for easy migration + - `FriendsOfHyperf\AsyncQueueClosureJob\dispatch()` → delegates to support package + +### Why This Change? + +1. **Eliminates Circular Dependencies**: The support package previously depended on this package, creating a circular dependency +2. **Single Source of Truth**: Core functionality now lives in one place +3. **Simplified Dependency Tree**: Reduces maintenance overhead +4. **Non-Breaking**: Existing code continues to work without changes + +### Migration Guide + +**No action required!** The namespace `FriendsOfHyperf\AsyncQueueClosureJob\dispatch()` continues to work as before. However, if you want to use the new location directly: + +```php +// Old (still supported) +use function FriendsOfHyperf\AsyncQueueClosureJob\dispatch; + +// New (recommended for new code) +use function FriendsOfHyperf\Support\dispatch; ``` +## Testing + +Tests for the core functionality are now maintained in the support package. + ## Contributing Contributions are welcome! Please feel free to submit a Pull Request. diff --git a/src/async-queue-closure-job/src/Functions.php b/src/async-queue-closure-job/src/Functions.php index 5f5b82359..e269eaa08 100644 --- a/src/async-queue-closure-job/src/Functions.php +++ b/src/async-queue-closure-job/src/Functions.php @@ -20,6 +20,8 @@ /** * Dispatch a closure as an async queue job. * + * @deprecated since version 3.1.73, will be removed in version 3.2, use `FriendsOfHyperf\Support\dispatch` instead. + * * @param Closure $closure The closure to execute * @param-closure-this CallQueuedClosure $closure */ diff --git a/src/support/README.md b/src/support/README.md index f52840d52..82fdbd21c 100644 --- a/src/support/README.md +++ b/src/support/README.md @@ -4,16 +4,216 @@ [![Total Downloads](https://img.shields.io/packagist/dt/friendsofhyperf/support.svg?style=flat-square)](https://packagist.org/packages/friendsofhyperf/support) [![GitHub license](https://img.shields.io/github/license/friendsofhyperf/support)](https://github.com/friendsofhyperf/support) -Another support component for Hyperf. +A comprehensive support component for Hyperf providing essential utilities, helpers, and base classes. -## Installation +## Features + +- 🎯 **Fluent Dispatch API** - Elegant job dispatch with support for async queue, AMQP, and Kafka +- 🔄 **Closure Jobs** - Execute closures as background jobs with dependency injection +- 🛠️ **Helper Functions** - Collection of useful helper functions +- 📦 **Bus System** - Pending dispatch classes for various message systems +- 🧩 **Traits & Utilities** - Reusable traits and utility classes -- Installation +## Installation ```shell composer require friendsofhyperf/support ``` +## Usage + +### Dispatch Helper Function + +The `dispatch()` helper function provides a fluent API for dispatching jobs to different systems: + +#### Async Queue (Closure Jobs) + +```php +use function FriendsOfHyperf\Support\dispatch; + +// Simple closure dispatch to async queue +dispatch(function () { + // Your job logic here + logger()->info('Job executed!'); +}); + +// With configuration +dispatch(function () { + // Your job logic here +}) + ->onConnection('high-priority') + ->delay(60) // Execute after 60 seconds + ->setMaxAttempts(5); + +// With dependency injection +dispatch(function (UserService $userService, LoggerInterface $logger) { + $users = $userService->getActiveUsers(); + $logger->info('Processing ' . count($users) . ' users'); +}); +``` + +#### AMQP Producer Messages + +```php +use Hyperf\Amqp\Message\ProducerMessageInterface; +use function FriendsOfHyperf\Support\dispatch; + +// Dispatch AMQP message +dispatch($amqpMessage) + ->setPool('default') + ->setExchange('my.exchange') + ->setRoutingKey('my.routing.key') + ->setTimeout(10) + ->setConfirm(true); +``` + +#### Kafka Producer Messages + +```php +use Hyperf\Kafka\Producer\ProduceMessage; +use function FriendsOfHyperf\Support\dispatch; + +// Dispatch Kafka message +dispatch($kafkaMessage) + ->setPool('default'); +``` + +### CallQueuedClosure + +The `CallQueuedClosure` class allows you to execute closures as async queue jobs: + +```php +use FriendsOfHyperf\Support\CallQueuedClosure; + +// Create a closure job +$job = CallQueuedClosure::create(function () { + // Your job logic + return 'Job completed!'; +}); + +// Configure max attempts +$job->setMaxAttempts(3); + +// The job can be pushed to queue manually or via dispatch() +``` + +### Pending Dispatch Classes + +#### PendingAsyncQueueDispatch + +Fluent API for async queue job dispatch: + +```php +use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch; + +$pending = new PendingAsyncQueueDispatch($job); +$pending + ->onConnection('default') + ->delay(30) + ->when($condition, function ($dispatch) { + $dispatch->onConnection('special'); + }) + ->unless($otherCondition, function ($dispatch) { + $dispatch->delay(60); + }); +// Job is dispatched when object is destroyed +``` + +#### PendingAmqpProducerMessageDispatch + +Fluent API for AMQP message dispatch: + +```php +use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch; + +$pending = new PendingAmqpProducerMessageDispatch($message); +$pending + ->setPool('default') + ->setExchange('my.exchange') + ->setRoutingKey('my.routing.key') + ->setTimeout(5) + ->setConfirm(true); +// Message is sent when object is destroyed +``` + +#### PendingKafkaProducerMessageDispatch + +Fluent API for Kafka message dispatch: + +```php +use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch; + +$pending = new PendingKafkaProducerMessageDispatch($message); +$pending->setPool('default'); +// Message is sent when object is destroyed +``` + +### Conditional Execution + +All pending dispatch classes support conditional execution: + +```php +use function FriendsOfHyperf\Support\dispatch; + +dispatch($job) + ->when($shouldUseHighPriority, function ($dispatch) { + $dispatch->onConnection('high-priority'); + }) + ->unless($isTestMode, function ($dispatch) { + $dispatch->delay(10); + }); +``` + +## API Reference + +### dispatch($job) + +Creates a pending dispatch instance based on the job type: + +- `Closure` → `PendingAsyncQueueDispatch` with `CallQueuedClosure` +- `ProducerMessageInterface` → `PendingAmqpProducerMessageDispatch` +- `ProduceMessage` → `PendingKafkaProducerMessageDispatch` +- Other objects → `PendingAsyncQueueDispatch` + +### PendingAsyncQueueDispatch Methods + +- `onConnection(string $connection): static` - Set queue connection +- `delay(int $delay): static` - Delay job execution (seconds) +- `setMaxAttempts(int $attempts): static` - Set max retry attempts +- `when(mixed $condition, callable $callback): static` - Conditional execution +- `unless(mixed $condition, callable $callback): static` - Inverse conditional execution + +### PendingAmqpProducerMessageDispatch Methods + +- `setPool(string $pool): static` - Set AMQP pool name +- `setExchange(string $exchange): static` - Set exchange name +- `setRoutingKey(array|string $routingKey): static` - Set routing key(s) +- `setTimeout(int $timeout): static` - Set timeout (seconds) +- `setConfirm(bool $confirm): static` - Enable/disable confirm mode +- `when(mixed $condition, callable $callback): static` - Conditional execution +- `unless(mixed $condition, callable $callback): static` - Inverse conditional execution + +### PendingKafkaProducerMessageDispatch Methods + +- `setPool(string $pool): static` - Set Kafka pool name +- `when(mixed $condition, callable $callback): static` - Conditional execution +- `unless(mixed $condition, callable $callback): static` - Inverse conditional execution + +### CallQueuedClosure + +- `create(Closure $closure): static` - Create a new closure job +- `setMaxAttempts(int $attempts): void` - Set max retry attempts +- `handle(): mixed` - Execute the closure (called by queue worker) + +## Architecture Notes + +As of v3.1.73, this package includes the core async queue closure functionality: + +- Previously in `friendsofhyperf/async-queue-closure-job` +- Moved here to eliminate circular dependencies +- The `async-queue-closure-job` package now depends on this package +- All functionality remains backward compatible + ## Contact - [Twitter](https://twitter.com/huangdijia) diff --git a/tests/Support/DispatchTest.php b/tests/Support/DispatchTest.php index 1360d01f0..d054a83f5 100644 --- a/tests/Support/DispatchTest.php +++ b/tests/Support/DispatchTest.php @@ -12,10 +12,10 @@ namespace FriendsOfHyperf\Tests\Support; use Exception; -use FriendsOfHyperf\AsyncQueueClosureJob\CallQueuedClosure; use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch; use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch; use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch; +use FriendsOfHyperf\Support\CallQueuedClosure; use FriendsOfHyperf\Tests\TestCase; use Hyperf\Amqp\Message\ProducerMessage; use Hyperf\Amqp\Producer;