From e518e99879526277a6f5a72f4b4b75af90dc6db0 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Sat, 15 Nov 2025 22:07:16 +0800 Subject: [PATCH 1/4] fix: standardize pool parameter naming in AsyncQueue and Kafka facades - Rename parameter from $queue to $pool in AsyncQueue::push() and Kafka facade methods to accurately reflect that these parameters specify connection pools, not queue names - Update AsyncQueue to check both $pool and $queue properties for backward compatibility, with $pool taking precedence - Add @property annotation for $pool in AsyncQueue docblock - Ensure consistent fallback logic across both facades with 'default' as final fallback - Improve code clarity by using $pool for pool selection and $subPool for batch message grouping in Kafka::sendBatch() This change improves API consistency and makes the codebase more maintainable by using semantically correct parameter names. --- src/facade/src/AsyncQueue.php | 7 ++++--- src/facade/src/Kafka.php | 17 ++++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/facade/src/AsyncQueue.php b/src/facade/src/AsyncQueue.php index 5e47a9e96..a0f48f0f1 100644 --- a/src/facade/src/AsyncQueue.php +++ b/src/facade/src/AsyncQueue.php @@ -18,6 +18,7 @@ /** * @mixin DriverFactory * @property null|string $queue + * @property null|string $pool */ class AsyncQueue extends Facade { @@ -25,11 +26,11 @@ class AsyncQueue extends Facade * Push a job to the queue. * @return bool */ - public static function push(JobInterface $job, int $delay = 0, ?string $queue = null) + public static function push(JobInterface $job, int $delay = 0, ?string $pool = null) { - $queue = (fn ($queue) => $this->queue ?? $queue)->call($job, $queue); + $pool ??= (fn () => $this->queue ?? $this->pool ?? 'default')->call($job); - return self::get($queue)->push($job, $delay); + return self::get($pool)->push($job, $delay); } #[Override] diff --git a/src/facade/src/Kafka.php b/src/facade/src/Kafka.php index 8528f1314..2b0449b11 100644 --- a/src/facade/src/Kafka.php +++ b/src/facade/src/Kafka.php @@ -21,28 +21,27 @@ */ class Kafka extends Facade { - public static function send(ProduceMessage $produceMessage, ?string $queue = null): void + public static function send(ProduceMessage $produceMessage, ?string $pool = null): void { - $queue = (fn ($queue) => $this->queue ?? $queue)->call($produceMessage, $queue); - self::getProducer($queue)->sendBatch([$produceMessage]); + $pool ??= (fn () => $this->pool ?? $this->queue ?? 'default')->call($produceMessage); + self::getProducer($pool)->sendBatch([$produceMessage]); } /** * @param ProduceMessage[] $produceMessages */ - public static function sendBatch($produceMessages, ?string $queue = null): void + public static function sendBatch($produceMessages, ?string $pool = null): void { /** @var array */ $groupMessages = []; - $queue ??= 'default'; foreach ($produceMessages as $message) { - $queue = (fn ($queue) => $this->queue ?? $queue)->call($message, $queue); - $groupMessages[$queue][] = $message; + $subPool = (fn () => $pool ?? $this->pool ?? $this->queue ?? 'default')->call($message); + $groupMessages[$subPool][] = $message; } - foreach ($groupMessages as $queue => $messages) { - self::getProducer($queue)->sendBatch($messages); + foreach ($groupMessages as $subPool => $messages) { + self::getProducer($subPool)->sendBatch($messages); } } From 5fa9597ea2e0ef523f4b9a2e08c706371b1bef57 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Sat, 15 Nov 2025 22:11:13 +0800 Subject: [PATCH 2/4] fix: add dispatch methods to AMQP, AsyncQueue, and Kafka facades --- src/facade/composer.json | 1 + src/facade/src/AMQP.php | 8 ++++++++ src/facade/src/AsyncQueue.php | 9 +++++++++ src/facade/src/Kafka.php | 8 ++++++++ 4 files changed, 26 insertions(+) diff --git a/src/facade/composer.json b/src/facade/composer.json index dd96bf836..4a340acc1 100644 --- a/src/facade/composer.json +++ b/src/facade/composer.json @@ -20,6 +20,7 @@ "pull-request": "https://github.com/friendsofhyperf/components/pulls" }, "require": { + "friendsofhyperf/support": "~3.1.73", "hyperf/context": "~3.1.0", "hyperf/di": "~3.1.0" }, diff --git a/src/facade/src/AMQP.php b/src/facade/src/AMQP.php index 9f7993489..89db79491 100644 --- a/src/facade/src/AMQP.php +++ b/src/facade/src/AMQP.php @@ -11,15 +11,23 @@ namespace FriendsOfHyperf\Facade; +use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch; use Hyperf\Amqp\Message\ProducerMessageInterface; use Hyperf\Amqp\Producer; use Override; +use function FriendsOfHyperf\Support\dispatch; + /** * @method static bool produce(ProducerMessageInterface $producerMessage, bool $confirm = false, int $timeout = 5) */ class AMQP extends Facade { + public function dispatch(ProducerMessageInterface $producerMessage): PendingAmqpProducerMessageDispatch + { + return dispatch($producerMessage); + } + #[Override] protected static function getFacadeAccessor() { diff --git a/src/facade/src/AsyncQueue.php b/src/facade/src/AsyncQueue.php index a0f48f0f1..2376c248b 100644 --- a/src/facade/src/AsyncQueue.php +++ b/src/facade/src/AsyncQueue.php @@ -11,10 +11,14 @@ namespace FriendsOfHyperf\Facade; +use Closure; +use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch; use Hyperf\AsyncQueue\Driver\DriverFactory; use Hyperf\AsyncQueue\JobInterface; use Override; +use function FriendsOfHyperf\Support\dispatch; + /** * @mixin DriverFactory * @property null|string $queue @@ -22,6 +26,11 @@ */ class AsyncQueue extends Facade { + public function dispatch(Closure|JobInterface $job): PendingAsyncQueueDispatch + { + return dispatch($job); + } + /** * Push a job to the queue. * @return bool diff --git a/src/facade/src/Kafka.php b/src/facade/src/Kafka.php index 2b0449b11..10d30a509 100644 --- a/src/facade/src/Kafka.php +++ b/src/facade/src/Kafka.php @@ -11,16 +11,24 @@ namespace FriendsOfHyperf\Facade; +use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch; use Hyperf\Kafka\ProducerManager; use longlang\phpkafka\Producer\ProduceMessage; use Override; +use function FriendsOfHyperf\Support\dispatch; + /** * @mixin ProducerManager * @property null|string $queue */ class Kafka extends Facade { + public function dispatch(ProduceMessage $produceMessage): PendingKafkaProducerMessageDispatch + { + return dispatch($produceMessage); + } + public static function send(ProduceMessage $produceMessage, ?string $pool = null): void { $pool ??= (fn () => $this->pool ?? $this->queue ?? 'default')->call($produceMessage); From b874f8f29f9f1b5b4ac9ea023a6ad3a5d1e08455 Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Sat, 15 Nov 2025 22:12:46 +0800 Subject: [PATCH 3/4] fix: remove unnecessary suggestion for friendsofhyperf/support in composer.json --- src/facade/composer.json | 1 - 1 file changed, 1 deletion(-) diff --git a/src/facade/composer.json b/src/facade/composer.json index 4a340acc1..bf08eee13 100644 --- a/src/facade/composer.json +++ b/src/facade/composer.json @@ -26,7 +26,6 @@ }, "suggest": { "friendsofhyperf/encryption": "Required to use (~3.1.0)", - "friendsofhyperf/support": "Required to use (~3.1.0)", "hyperf/amqp": "Required to use (~3.1.0)", "hyperf/async-queue": "Required to use (~3.1.0)", "hyperf/cache": "Required to use (~3.1.0)", From b6b4f4907121fbca10f8205e8d79504c880df34c Mon Sep 17 00:00:00 2001 From: Deeka Wong <8337659+huangdijia@users.noreply.github.com> Date: Sat, 15 Nov 2025 22:14:58 +0800 Subject: [PATCH 4/4] fix: standardize parameter naming in Log facade channel method --- src/facade/src/Log.php | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/facade/src/Log.php b/src/facade/src/Log.php index 17ae3ef0e..287cf762b 100644 --- a/src/facade/src/Log.php +++ b/src/facade/src/Log.php @@ -37,15 +37,13 @@ public static function __callStatic($name, $arguments) } /** - * @param string $name - * @param string $group * @return \Psr\Log\LoggerInterface */ - public static function channel($name = 'hyperf', $group = 'default') + public static function channel(string $name = 'hyperf', string $channel = 'default') { return ApplicationContext::getContainer() ->get(static::getFacadeAccessor()) - ->get($name, $group); + ->get($name, $channel); } #[Override]