diff --git a/src/facade/composer.json b/src/facade/composer.json index dd96bf836..bf08eee13 100644 --- a/src/facade/composer.json +++ b/src/facade/composer.json @@ -20,12 +20,12 @@ "pull-request": "https://github.com/friendsofhyperf/components/pulls" }, "require": { + "friendsofhyperf/support": "~3.1.73", "hyperf/context": "~3.1.0", "hyperf/di": "~3.1.0" }, "suggest": { "friendsofhyperf/encryption": "Required to use (~3.1.0)", - "friendsofhyperf/support": "Required to use (~3.1.0)", "hyperf/amqp": "Required to use (~3.1.0)", "hyperf/async-queue": "Required to use (~3.1.0)", "hyperf/cache": "Required to use (~3.1.0)", diff --git a/src/facade/src/AMQP.php b/src/facade/src/AMQP.php index 9f7993489..89db79491 100644 --- a/src/facade/src/AMQP.php +++ b/src/facade/src/AMQP.php @@ -11,15 +11,23 @@ namespace FriendsOfHyperf\Facade; +use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch; use Hyperf\Amqp\Message\ProducerMessageInterface; use Hyperf\Amqp\Producer; use Override; +use function FriendsOfHyperf\Support\dispatch; + /** * @method static bool produce(ProducerMessageInterface $producerMessage, bool $confirm = false, int $timeout = 5) */ class AMQP extends Facade { + public function dispatch(ProducerMessageInterface $producerMessage): PendingAmqpProducerMessageDispatch + { + return dispatch($producerMessage); + } + #[Override] protected static function getFacadeAccessor() { diff --git a/src/facade/src/AsyncQueue.php b/src/facade/src/AsyncQueue.php index 5e47a9e96..2376c248b 100644 --- a/src/facade/src/AsyncQueue.php +++ b/src/facade/src/AsyncQueue.php @@ -11,25 +11,35 @@ namespace FriendsOfHyperf\Facade; +use Closure; +use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch; use Hyperf\AsyncQueue\Driver\DriverFactory; use Hyperf\AsyncQueue\JobInterface; use Override; +use function FriendsOfHyperf\Support\dispatch; + /** * @mixin DriverFactory * @property null|string $queue + * @property null|string $pool */ class AsyncQueue extends Facade { + public function dispatch(Closure|JobInterface $job): PendingAsyncQueueDispatch + { + return dispatch($job); + } + /** * Push a job to the queue. * @return bool */ - public static function push(JobInterface $job, int $delay = 0, ?string $queue = null) + public static function push(JobInterface $job, int $delay = 0, ?string $pool = null) { - $queue = (fn ($queue) => $this->queue ?? $queue)->call($job, $queue); + $pool ??= (fn () => $this->queue ?? $this->pool ?? 'default')->call($job); - return self::get($queue)->push($job, $delay); + return self::get($pool)->push($job, $delay); } #[Override] diff --git a/src/facade/src/Kafka.php b/src/facade/src/Kafka.php index 8528f1314..10d30a509 100644 --- a/src/facade/src/Kafka.php +++ b/src/facade/src/Kafka.php @@ -11,38 +11,45 @@ namespace FriendsOfHyperf\Facade; +use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch; use Hyperf\Kafka\ProducerManager; use longlang\phpkafka\Producer\ProduceMessage; use Override; +use function FriendsOfHyperf\Support\dispatch; + /** * @mixin ProducerManager * @property null|string $queue */ class Kafka extends Facade { - public static function send(ProduceMessage $produceMessage, ?string $queue = null): void + public function dispatch(ProduceMessage $produceMessage): PendingKafkaProducerMessageDispatch + { + return dispatch($produceMessage); + } + + public static function send(ProduceMessage $produceMessage, ?string $pool = null): void { - $queue = (fn ($queue) => $this->queue ?? $queue)->call($produceMessage, $queue); - self::getProducer($queue)->sendBatch([$produceMessage]); + $pool ??= (fn () => $this->pool ?? $this->queue ?? 'default')->call($produceMessage); + self::getProducer($pool)->sendBatch([$produceMessage]); } /** * @param ProduceMessage[] $produceMessages */ - public static function sendBatch($produceMessages, ?string $queue = null): void + public static function sendBatch($produceMessages, ?string $pool = null): void { /** @var array */ $groupMessages = []; - $queue ??= 'default'; foreach ($produceMessages as $message) { - $queue = (fn ($queue) => $this->queue ?? $queue)->call($message, $queue); - $groupMessages[$queue][] = $message; + $subPool = (fn () => $pool ?? $this->pool ?? $this->queue ?? 'default')->call($message); + $groupMessages[$subPool][] = $message; } - foreach ($groupMessages as $queue => $messages) { - self::getProducer($queue)->sendBatch($messages); + foreach ($groupMessages as $subPool => $messages) { + self::getProducer($subPool)->sendBatch($messages); } } diff --git a/src/facade/src/Log.php b/src/facade/src/Log.php index 17ae3ef0e..287cf762b 100644 --- a/src/facade/src/Log.php +++ b/src/facade/src/Log.php @@ -37,15 +37,13 @@ public static function __callStatic($name, $arguments) } /** - * @param string $name - * @param string $group * @return \Psr\Log\LoggerInterface */ - public static function channel($name = 'hyperf', $group = 'default') + public static function channel(string $name = 'hyperf', string $channel = 'default') { return ApplicationContext::getContainer() ->get(static::getFacadeAccessor()) - ->get($name, $group); + ->get($name, $channel); } #[Override]