Skip to content

Commit 12808fd

Browse files
committed
chore: update dependencies and enhance docker-compose configuration
- Replaced appwrite/php-amqplib with appwrite-labs/php-amqplib in composer.json for better compatibility. - Added amqp-swoole service to docker-compose.yml for improved testing capabilities. - Updated phpunit.xml to enable process isolation and stop on failure for more robust test execution.
1 parent d46c4f2 commit 12808fd

File tree

7 files changed

+273
-3
lines changed

7 files changed

+273
-3
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
},
2626
"require": {
2727
"php": ">=8.3",
28-
"appwrite/php-amqplib": "0.1.0",
28+
"appwrite-labs/php-amqplib": "0.1.0",
2929
"utopia-php/cli": "0.15.*",
3030
"utopia-php/framework": "0.33.*",
3131
"utopia-php/telemetry": "0.1.*",

docker-compose.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ services:
55
volumes:
66
- ./src:/usr/local/src/src
77
- ./tests:/usr/local/src/tests
8+
- ./phpunit.xml:/usr/local/src/phpunit.xml
89
depends_on:
910
- swoole
1011
- swoole-amqp
12+
- amqp-swoole
1113
- swoole-redis-cluster
1214
- workerman
1315

@@ -43,6 +45,17 @@ services:
4345
amqp:
4446
condition: service_healthy
4547

48+
amqp-swoole:
49+
container_name: amqp-swoole
50+
build: ./tests/Queue/servers/AMQPSwoole/.
51+
command: php /usr/src/code/tests/Queue/servers/AMQPSwoole/worker.php
52+
volumes:
53+
- ./src:/usr/local/src/src
54+
- ./tests:/usr/local/src/tests
55+
depends_on:
56+
amqp:
57+
condition: service_healthy
58+
4659
workerman:
4760
container_name: workerman
4861
build: ./tests/Queue/servers/Workerman/.

docs/AMQPSwoole.md

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# AMQPSwoole Broker
2+
3+
## Overview
4+
5+
The `AMQPSwoole` class is a specialized AMQP broker implementation designed for use in Swoole environments. It extends the base `AMQP` class and resolves compatibility issues that occur when using the standard AMQP broker in Swoole coroutines.
6+
7+
## Why AMQPSwoole is Needed
8+
9+
When using the standard `AMQP` broker in Swoole environments, you may encounter errors like:
10+
11+
```
12+
Fatal error: Uncaught Swoole\Error: API must be called in the coroutine
13+
```
14+
15+
This happens because:
16+
- The standard AMQP broker uses `AMQPStreamConnection` with `StreamIO`
17+
- `StreamIO` calls `stream_select()` which is not allowed outside Swoole coroutines
18+
- This causes fatal errors during connection cleanup and heartbeat operations
19+
20+
## Solution
21+
22+
The `AMQPSwoole` class solves this by:
23+
- Using `AMQPSwooleConnection` instead of `AMQPStreamConnection`
24+
- Leveraging `SwooleIO` which is designed for Swoole environments
25+
- Properly handling coroutine-based I/O operations
26+
27+
## Usage
28+
29+
### Basic Usage
30+
31+
```php
32+
use Utopia\Queue\Broker\AMQPSwoole;
33+
use Utopia\Queue\Queue;
34+
35+
// Create broker instance
36+
$broker = new AMQPSwoole(
37+
host: 'localhost',
38+
port: 5672,
39+
user: 'guest',
40+
password: 'guest'
41+
);
42+
43+
// Create queue
44+
$queue = new Queue('my-queue');
45+
46+
// Enqueue a message
47+
$broker->enqueue($queue, ['message' => 'Hello World']);
48+
```
49+
50+
### Consumer Usage
51+
52+
```php
53+
use Utopia\Queue\Consumer;
54+
use Utopia\Queue\Message;
55+
56+
$broker->consume(
57+
$queue,
58+
function (Message $message) {
59+
// Process the message
60+
$payload = $message->getPayload();
61+
echo "Processing: " . json_encode($payload) . PHP_EOL;
62+
63+
// Return result (optional)
64+
return new \Utopia\Queue\Result\Commit();
65+
},
66+
function (Message $message) {
67+
echo "Message processed successfully" . PHP_EOL;
68+
},
69+
function (?Message $message, \Throwable $error) {
70+
echo "Error processing message: " . $error->getMessage() . PHP_EOL;
71+
}
72+
);
73+
```
74+
75+
## Configuration
76+
77+
The `AMQPSwoole` class accepts the same configuration parameters as the base `AMQP` class:
78+
79+
- `host`: RabbitMQ server hostname
80+
- `port`: RabbitMQ server port (default: 5672)
81+
- `httpPort`: RabbitMQ management HTTP port (default: 15672)
82+
- `user`: Username for authentication
83+
- `password`: Password for authentication
84+
- `vhost`: Virtual host (default: '/')
85+
- `heartbeat`: Heartbeat interval in seconds (default: 0)
86+
- `connectTimeout`: Connection timeout in seconds (default: 3.0)
87+
- `readWriteTimeout`: Read/write timeout in seconds (default: 3.0)
88+
89+
## Testing
90+
91+
The AMQPSwoole broker includes comprehensive tests that verify:
92+
- Basic message enqueueing and processing
93+
- Concurrency handling in Swoole environments
94+
- Error handling and retry mechanisms
95+
- Queue size reporting
96+
97+
Run tests with:
98+
```bash
99+
composer test
100+
```
101+
102+
## Migration from AMQP to AMQPSwoole
103+
104+
If you're currently using the `AMQP` broker in a Swoole environment and experiencing errors, migration is simple:
105+
106+
```php
107+
// Before
108+
$broker = new \Utopia\Queue\Broker\AMQP($host, $port, $httpPort, $user, $password);
109+
110+
// After
111+
$broker = new \Utopia\Queue\Broker\AMQPSwoole($host, $port, $httpPort, $user, $password);
112+
```
113+
114+
All other APIs remain exactly the same.

phpunit.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
convertErrorsToExceptions="true"
66
convertNoticesToExceptions="true"
77
convertWarningsToExceptions="true"
8-
processIsolation="false"
9-
stopOnFailure="false"
8+
processIsolation="true"
9+
stopOnFailure="true"
1010
>
1111
<testsuites>
1212
<testsuite name="E2E">
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
<?php
2+
3+
namespace Tests\E2E\Adapter;
4+
5+
use Utopia\Queue\Broker\AMQPSwoole;
6+
use Utopia\Queue\Publisher;
7+
use Utopia\Queue\Queue;
8+
9+
use function Co\run;
10+
11+
class AMQPSwooleTest extends Base
12+
{
13+
protected function getPublisher(): Publisher
14+
{
15+
return new AMQPSwoole(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp');
16+
}
17+
18+
protected function getQueue(): Queue
19+
{
20+
return new Queue('amqp-swoole');
21+
}
22+
23+
/**
24+
* Override testEvents to run within Swoole coroutines
25+
*/
26+
public function testEvents(): void
27+
{
28+
run(function () {
29+
go(function () {
30+
$publisher = $this->getPublisher();
31+
32+
foreach ($this->payloads as $payload) {
33+
$this->assertTrue($publisher->enqueue($this->getQueue(), $payload));
34+
}
35+
36+
// Allow some time for async processing (if any)
37+
sleep(1);
38+
39+
/** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */
40+
$publisher->close();
41+
});
42+
});
43+
}
44+
45+
public function testConcurrency(): void
46+
{
47+
run(function () {
48+
go(function () {
49+
$publisher = $this->getPublisher();
50+
51+
foreach ($this->payloads as $payload) {
52+
$this->assertTrue($publisher->enqueue($this->getQueue(), $payload));
53+
}
54+
55+
sleep(1);
56+
57+
/** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */
58+
$publisher->close();
59+
});
60+
});
61+
}
62+
63+
/**
64+
* Override testRetry to run within Swoole coroutines
65+
* @depends testEvents
66+
*/
67+
public function testRetry(): void
68+
{
69+
run(function () {
70+
go(function () {
71+
$publisher = $this->getPublisher();
72+
73+
$published = $publisher->enqueue($this->getQueue(), [
74+
'type' => 'test_exception',
75+
'id' => 1
76+
]);
77+
78+
$this->assertTrue($published);
79+
80+
$published = $publisher->enqueue($this->getQueue(), [
81+
'type' => 'test_exception',
82+
'id' => 2
83+
]);
84+
85+
$this->assertTrue($published);
86+
87+
$published = $publisher->enqueue($this->getQueue(), [
88+
'type' => 'test_exception',
89+
'id' => 3
90+
]);
91+
92+
$this->assertTrue($published);
93+
94+
$published = $publisher->enqueue($this->getQueue(), [
95+
'type' => 'test_exception',
96+
'id' => 4
97+
]);
98+
99+
$this->assertTrue($published);
100+
101+
sleep(1);
102+
103+
/** @var \Utopia\Queue\Broker\AMQPSwoole $publisher */
104+
$publisher->close();
105+
});
106+
});
107+
}
108+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
FROM phpswoole/swoole:php8.3-alpine
2+
3+
RUN apk add autoconf build-base
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
require_once __DIR__ . '/../../../../vendor/autoload.php';
4+
require_once __DIR__ . '/../tests.php';
5+
6+
use Utopia\Queue;
7+
use Utopia\Queue\Message;
8+
9+
$consumer = new Queue\Broker\AMQPSwoole(host: 'amqp', port: 5672, user: 'amqp', password: 'amqp');
10+
$adapter = new Queue\Adapter\Swoole($consumer, 12, 'amqp-swoole');
11+
$server = new Queue\Server($adapter);
12+
13+
$server->job()
14+
->inject('message')
15+
->action(function (Message $message) {
16+
handleRequest($message);
17+
});
18+
19+
$server
20+
->error()
21+
->inject('error')
22+
->action(function ($th) {
23+
echo $th->getMessage() . PHP_EOL;
24+
});
25+
26+
$server
27+
->workerStart()
28+
->action(function () {
29+
echo "Worker Started" . PHP_EOL;
30+
});
31+
32+
$server->start();

0 commit comments

Comments
 (0)