Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/helpers/src/Functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ function di(?string $abstract = null, array $parameters = [])
}

/**
* @deprecated since v3.1, will be removed in v3.2, use `FriendsOfHyperf\Support\dispatch()` instead
* @param AsyncTaskInterface|Closure|JobInterface|ProduceMessage|ProducerMessageInterface|object $job
* @return bool
*/
Expand Down
76 changes: 76 additions & 0 deletions src/support/src/Bus/PendingAmqpProducerMessageDispatch.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
<?php

declare(strict_types=1);
/**
* This file is part of friendsofhyperf/components.
*
* @link https://github.com/friendsofhyperf/components
* @document https://github.com/friendsofhyperf/components/blob/main/README.md
* @contact huangdijia@gmail.com
*/

namespace FriendsOfHyperf\Support\Bus;

use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\Amqp\Producer;
use Hyperf\Conditionable\Conditionable;
use Hyperf\Context\ApplicationContext;

/**
* @property array{application_headers?:AMQPTable} $properties
*/
class PendingAmqpProducerMessageDispatch
{
use Conditionable;

public ?string $pool = null;

public int $timeout = 5;

public bool $confirm = false;

public function __construct(protected ProducerMessageInterface $message)
{
}

public function __destruct()
{
$this->pool && $this->message->setPoolName($this->pool);
ApplicationContext::getContainer()
->get(Producer::class)
->produce($this->message, $this->confirm, $this->timeout);
}

public function onPool(string $pool): static
{
$this->pool = $pool;
return $this;
}

public function setPayload(mixed $data): static
{
$this->message->setPayload($data);
return $this;
}

public function withHeader(string $key, mixed $value, ?int $ttl = null): static
{
(function () use ($key, $value, $ttl) {
$this->properties['application_headers'] ??= new \PhpAmqpLib\Wire\AMQPTable(); // @phpstan-ignore-line
$this->properties['application_headers']->set($key, $value, $ttl);
})->call($this->message);
return $this;
}

public function setConfirm(bool $confirm): static
{
$this->confirm = $confirm;
return $this;
}

public function setTimeout(int $timeout): static
{
$this->timeout = $timeout;
return $this;
}
}
56 changes: 56 additions & 0 deletions src/support/src/Bus/PendingAsyncQueueDispatch.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);
/**
* This file is part of friendsofhyperf/components.
*
* @link https://github.com/friendsofhyperf/components
* @document https://github.com/friendsofhyperf/components/blob/main/README.md
* @contact huangdijia@gmail.com
*/

namespace FriendsOfHyperf\Support\Bus;

use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\JobInterface;
use Hyperf\Conditionable\Conditionable;
use Hyperf\Context\ApplicationContext;

class PendingAsyncQueueDispatch
{
use Conditionable;

public string $pool = 'default';

public int $delay = 0;

public function __construct(protected JobInterface $job)
{
}

public function __destruct()
{
ApplicationContext::getContainer()
->get(DriverFactory::class)
->get($this->pool)
->push($this->job, $this->delay);
}

public function setMaxAttempts(int $maxAttempts): static
{
$this->job->setMaxAttempts($maxAttempts);
return $this;
}

public function onPool(string $pool): static
{
$this->pool = $pool;
return $this;
}

public function delay(int $delay): static
{
$this->delay = $delay;
return $this;
}
}
67 changes: 67 additions & 0 deletions src/support/src/Bus/PendingKafkaProducerMessageDispatch.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?php

declare(strict_types=1);
/**
* This file is part of friendsofhyperf/components.
*
* @link https://github.com/friendsofhyperf/components
* @document https://github.com/friendsofhyperf/components/blob/main/README.md
* @contact huangdijia@gmail.com
*/

namespace FriendsOfHyperf\Support\Bus;

use Hyperf\Conditionable\Conditionable;
use Hyperf\Context\ApplicationContext;
use Hyperf\Kafka\ProducerManager;
use longlang\phpkafka\Producer\ProduceMessage;
use longlang\phpkafka\Protocol\RecordBatch\RecordHeader;

/**
* @property array $headers
* @property null|string $key
* @property null|string $value
*/
class PendingKafkaProducerMessageDispatch
{
use Conditionable;

public string $pool = 'default';

public function __construct(protected ProduceMessage $message)
{
}

public function __destruct()
{
ApplicationContext::getContainer()
->get(ProducerManager::class)
->getProducer($this->pool)
->sendBatch([$this->message]);
}

public function onPool(string $pool): static
{
$this->pool = $pool;
return $this;
}

public function setKey(string $key): static
{
(fn () => $this->key = $key)->call($this->message);
return $this;
}

public function setValue(string $value): static
{
(fn () => $this->value = $value)->call($this->message);
return $this;
}

public function withHeader(string $key, string $value): static
{
$header = (new RecordHeader())->setHeaderKey($key)->setValue($value);
(fn () => $this->headers[] = $header)->call($this->message);
return $this;
}
}
29 changes: 29 additions & 0 deletions src/support/src/Functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,38 @@

use Closure;
use Exception;
use FriendsOfHyperf\AsyncQueueClosureJob\CallQueuedClosure;
use FriendsOfHyperf\Support\Bus\PendingAmqpProducerMessageDispatch;
use FriendsOfHyperf\Support\Bus\PendingAsyncQueueDispatch;
use FriendsOfHyperf\Support\Bus\PendingKafkaProducerMessageDispatch;
use Hyperf\Amqp\Message\ProducerMessageInterface;
use Hyperf\AsyncQueue\JobInterface;
use InvalidArgumentException;
use longlang\phpkafka\Producer\ProduceMessage;

use function Hyperf\Support\value;

/**
* Do not assign a value to the return value of this function unless you are very clear about the consequences of doing so.
* @param Closure|JobInterface|ProduceMessage|ProducerMessageInterface|mixed $job
* @param-closure-this ($job is Closure ? CallQueuedClosure : mixed) $job
* @return ($job is Closure ? PendingAsyncQueueDispatch : ($job is JobInterface ? PendingAsyncQueueDispatch : ($job is ProducerMessageInterface ? PendingAmqpProducerMessageDispatch : PendingKafkaProducerMessageDispatch)))
* @throws InvalidArgumentException
*/
function dispatch($job)
{
if ($job instanceof Closure) {
$job = CallQueuedClosure::create($job);
}

return match (true) {
interface_exists(ProducerMessageInterface::class) && $job instanceof ProducerMessageInterface => new PendingAmqpProducerMessageDispatch($job),
class_exists(ProduceMessage::class) && $job instanceof ProduceMessage => new PendingKafkaProducerMessageDispatch($job),
interface_exists(JobInterface::class) && $job instanceof JobInterface => new PendingAsyncQueueDispatch($job),
default => throw new InvalidArgumentException('Unsupported job type.')
};
}

/**
* Retry an operation a given number of times.
* @template TReturn
Expand Down
Loading