Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/facade/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
"pull-request": "https://github.com/friendsofhyperf/components/pulls"
},
"require": {
"friendsofhyperf/support": "~3.1.73",
"hyperf/context": "~3.1.0",
"hyperf/di": "~3.1.0"
},
"suggest": {
"friendsofhyperf/encryption": "Required to use (~3.1.0)",
"friendsofhyperf/support": "Required to use (~3.1.0)",
"hyperf/amqp": "Required to use (~3.1.0)",
"hyperf/async-queue": "Required to use (~3.1.0)",
"hyperf/cache": "Required to use (~3.1.0)",
Expand Down
8 changes: 8 additions & 0 deletions src/facade/src/AMQP.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,23 @@

namespace FriendsOfHyperf\Facade;

use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Amqp\Producer;
use Override;

use function FriendsOfHyperf\Support\dispatch;

/**
* @method static bool produce(ProducerMessageInterface $producerMessage, bool $confirm = false, int $timeout = 5)
*/
class AMQP extends Facade
{
public function dispatch(ProducerMessageInterface $producerMessage): PendingAmqpProducerMessageDispatch
{
return dispatch($producerMessage);
}

#[Override]
protected static function getFacadeAccessor()
{
Expand Down
16 changes: 13 additions & 3 deletions src/facade/src/AsyncQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,35 @@

namespace FriendsOfHyperf\Facade;

use Closure;
use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\JobInterface;
use Override;

use function FriendsOfHyperf\Support\dispatch;

/**
* @mixin DriverFactory
* @property null|string $queue
* @property null|string $pool
*/
class AsyncQueue extends Facade
{
public function dispatch(Closure|JobInterface $job): PendingAsyncQueueDispatch
{
return dispatch($job);
}

/**
* Push a job to the queue.
* @return bool
*/
public static function push(JobInterface $job, int $delay = 0, ?string $queue = null)
public static function push(JobInterface $job, int $delay = 0, ?string $pool = null)
{
$queue = (fn ($queue) => $this->queue ?? $queue)->call($job, $queue);
$pool ??= (fn () => $this->queue ?? $this->pool ?? 'default')->call($job);

return self::get($queue)->push($job, $delay);
return self::get($pool)->push($job, $delay);
}

#[Override]
Expand Down
25 changes: 16 additions & 9 deletions src/facade/src/Kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,45 @@

namespace FriendsOfHyperf\Facade;

use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch;
use Hyperf\Kafka\ProducerManager;
use longlang\phpkafka\Producer\ProduceMessage;
use Override;

use function FriendsOfHyperf\Support\dispatch;

/**
* @mixin ProducerManager
* @property null|string $queue
*/
class Kafka extends Facade
{
public static function send(ProduceMessage $produceMessage, ?string $queue = null): void
public function dispatch(ProduceMessage $produceMessage): PendingKafkaProducerMessageDispatch
{
return dispatch($produceMessage);
}

public static function send(ProduceMessage $produceMessage, ?string $pool = null): void
{
$queue = (fn ($queue) => $this->queue ?? $queue)->call($produceMessage, $queue);
self::getProducer($queue)->sendBatch([$produceMessage]);
$pool ??= (fn () => $this->pool ?? $this->queue ?? 'default')->call($produceMessage);
self::getProducer($pool)->sendBatch([$produceMessage]);
}

/**
* @param ProduceMessage[] $produceMessages
*/
public static function sendBatch($produceMessages, ?string $queue = null): void
public static function sendBatch($produceMessages, ?string $pool = null): void
{
/** @var array<string,ProduceMessage[]> */
$groupMessages = [];
$queue ??= 'default';

foreach ($produceMessages as $message) {
$queue = (fn ($queue) => $this->queue ?? $queue)->call($message, $queue);
$groupMessages[$queue][] = $message;
$subPool = (fn () => $pool ?? $this->pool ?? $this->queue ?? 'default')->call($message);
$groupMessages[$subPool][] = $message;
}

foreach ($groupMessages as $queue => $messages) {
self::getProducer($queue)->sendBatch($messages);
foreach ($groupMessages as $subPool => $messages) {
self::getProducer($subPool)->sendBatch($messages);
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/facade/src/Log.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,13 @@ public static function __callStatic($name, $arguments)
}

/**
* @param string $name
* @param string $group
* @return \Psr\Log\LoggerInterface
*/
public static function channel($name = 'hyperf', $group = 'default')
public static function channel(string $name = 'hyperf', string $channel = 'default')
{
return ApplicationContext::getContainer()
->get(static::getFacadeAccessor())
->get($name, $group);
->get($name, $channel);
}

#[Override]
Expand Down