Skip to content

Commit c032f48

Browse files
committed
Abstract swoole with channel
1 parent afa3cfa commit c032f48

File tree

4 files changed

+12
-52
lines changed

4 files changed

+12
-52
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,15 @@
3333
"utopia-php/fetch": "0.4.*"
3434
},
3535
"require-dev": {
36-
"ext-redis": "*",
3736
"swoole/ide-helper": "5.1.7",
3837
"phpunit/phpunit": "9.*",
3938
"laravel/pint": "1.*",
4039
"workerman/workerman": "4.*",
4140
"phpstan/phpstan": "1.*"
4241
},
4342
"suggest": {
43+
"ext-redis": "Needed to support Redis.",
44+
"ext-amqp": "Needed to support AMQP.",
4445
"ext-swoole": "Needed to support Swoole.",
4546
"workerman/workerman": "Needed to support Workerman."
4647
},

composer.lock

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/Queue/Broker/AMQP.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ public function __construct(
5858
) {
5959
}
6060

61+
public function getConnectionType(): string
62+
{
63+
return AMQPStreamConnection::class;
64+
}
65+
6166
/**
6267
* Enable or disable waiting for publisher confirms.
6368
*/
@@ -281,7 +286,7 @@ protected function withChannel(callable $callback): void
281286
: 0;
282287

283288
$createChannel = function (): AMQPChannel {
284-
$connection = new AMQPStreamConnection(
289+
$connection = new ($this->getConnectionType())(
285290
$this->host,
286291
$this->port,
287292
$this->user,

src/Queue/Broker/AMQPSwoole.php

Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -2,56 +2,12 @@
22

33
namespace Utopia\Queue\Broker;
44

5-
use PhpAmqpLib\Channel\AMQPChannel;
65
use PhpAmqpLib\Connection\AMQPSwooleConnection;
76

87
class AMQPSwoole extends AMQP
98
{
10-
/**
11-
* Override the withChannel method to use AMQPSwooleConnection instead of AMQPStreamConnection
12-
*
13-
* @param callable(AMQPChannel $channel): void $callback
14-
* @throws \Exception
15-
*/
16-
protected function withChannel(callable $callback): void
9+
public function getConnectionType(): string
1710
{
18-
$createChannel = function (): AMQPChannel {
19-
$connection = new AMQPSwooleConnection(
20-
$this->host,
21-
$this->port,
22-
$this->user,
23-
$this->password,
24-
$this->vhost,
25-
connection_timeout: $this->connectTimeout,
26-
read_write_timeout: $this->readWriteTimeout,
27-
heartbeat: $this->heartbeat,
28-
);
29-
30-
if (\is_callable($this->connectionConfigHook)) {
31-
($this->connectionConfigHook)($connection);
32-
}
33-
34-
$channel = $connection->channel();
35-
36-
if (\is_callable($this->channelConfigHook)) {
37-
($this->channelConfigHook)($channel);
38-
}
39-
40-
return $channel;
41-
};
42-
43-
if (!$this->channel) {
44-
$this->channel = $createChannel();
45-
}
46-
47-
try {
48-
$callback($this->channel);
49-
} catch (\Throwable) {
50-
// createChannel() might throw, in that case set the channel to `null` first.
51-
$this->channel = null;
52-
// try creating a new connection once, if this still fails, throw the error
53-
$this->channel = $createChannel();
54-
$callback($this->channel);
55-
}
11+
return AMQPSwooleConnection::class;
5612
}
5713
}

0 commit comments

Comments
 (0)