Skip to content

Commit 0eccc55

Browse files
authored
Merge pull request #43 from utopia-php/feat-pool-adapter
Feat pool adapter
2 parents 032da0f + 178d620 commit 0eccc55

File tree

8 files changed

+169
-34
lines changed

8 files changed

+169
-34
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ jobs:
1616
adapter:
1717
[
1818
AMQP,
19+
Pool,
1920
SwooleRedisCluster,
2021
Swoole,
2122
Workerman,

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM composer as composer
1+
FROM composer AS composer
22

33
WORKDIR /usr/local/src/
44

composer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"utopia-php/cli": "0.15.*",
3030
"utopia-php/framework": "0.33.*",
3131
"utopia-php/telemetry": "0.1.*",
32+
"utopia-php/pools": "0.8.*",
3233
"utopia-php/fetch": "0.4.*"
3334
},
3435
"require-dev": {

composer.lock

Lines changed: 81 additions & 29 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Queue/Broker/Pool.php

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
<?php
2+
3+
namespace Utopia\Queue\Broker;
4+
5+
use Utopia\Queue\Consumer;
6+
use Utopia\Queue\Publisher;
7+
use Utopia\Queue\Queue;
8+
use Utopia\Pools\Pool as UtopiaPool;
9+
10+
readonly class Pool implements Publisher, Consumer
11+
{
12+
public function __construct(
13+
private ?UtopiaPool $publisher = null,
14+
private ?UtopiaPool $consumer = null,
15+
) {
16+
}
17+
18+
public function enqueue(Queue $queue, array $payload): bool
19+
{
20+
return $this->delegatePublish(__FUNCTION__, \func_get_args());
21+
}
22+
23+
public function retry(Queue $queue, ?int $limit = null): void
24+
{
25+
$this->delegatePublish(__FUNCTION__, \func_get_args());
26+
}
27+
28+
public function getQueueSize(Queue $queue, bool $failedJobs = false): int
29+
{
30+
return $this->delegatePublish(__FUNCTION__, \func_get_args());
31+
}
32+
33+
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
34+
{
35+
$this->delegateConsumer(__FUNCTION__, \func_get_args());
36+
}
37+
38+
public function close(): void
39+
{
40+
$this->delegateConsumer(__FUNCTION__, \func_get_args());
41+
}
42+
43+
protected function delegatePublish(string $method, array $args): mixed
44+
{
45+
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
46+
return $adapter->$method(...$args);
47+
});
48+
}
49+
50+
protected function delegateConsumer(string $method, array $args): mixed
51+
{
52+
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
53+
return $adapter->$method(...$args);
54+
});
55+
}
56+
}

tests/Queue/E2E/Adapter/AMQPTest.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
<?php
22

3-
namespace Queue\E2E\Adapter;
3+
namespace Tests\E2E\Adapter;
44

5-
use Tests\E2E\Adapter\Base;
65
use Utopia\Queue\Broker\AMQP;
76
use Utopia\Queue\Publisher;
87
use Utopia\Queue\Queue;
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
namespace Tests\E2E\Adapter;
4+
5+
use Utopia\Pools\Pool as UtopiaPool;
6+
use Utopia\Queue\Broker\Pool;
7+
use Utopia\Queue\Broker\Redis as RedisBroker;
8+
use Utopia\Queue\Connection\Redis;
9+
use Utopia\Queue\Publisher;
10+
use Utopia\Queue\Queue;
11+
12+
class PoolTest extends Base
13+
{
14+
protected function getPublisher(): Publisher
15+
{
16+
$pool = new UtopiaPool('redis', 1, function () {
17+
return new RedisBroker(new Redis('redis', 6379));
18+
});
19+
20+
return new Pool($pool, $pool);
21+
}
22+
23+
protected function getQueue(): Queue
24+
{
25+
return new Queue('pool');
26+
}
27+
}

tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
<?php
22

3-
namespace Queue\E2E\Adapter;
3+
namespace Tests\E2E\Adapter;
44

5-
use Tests\E2E\Adapter\Base;
65
use Utopia\Queue\Broker\Redis;
76
use Utopia\Queue\Connection\RedisCluster;
87
use Utopia\Queue\Publisher;

0 commit comments

Comments
 (0)