Skip to content

Commit 92916aa

Browse files
committed
chore: add workerstop
1 parent eed8b7c commit 92916aa

File tree

11 files changed

+343
-240
lines changed

11 files changed

+343
-240
lines changed

src/Queue/Adapter/Swoole.php

Lines changed: 41 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,24 @@
33
namespace Utopia\Queue\Adapter;
44

55
use Swoole\Constant;
6+
use Swoole\Process;
67
use Swoole\Process\Pool;
7-
use Utopia\CLI\Console;
8+
9+
use function Swoole\Coroutine\go;
10+
use Utopia\Console;
811
use Utopia\Queue\Adapter;
912
use Utopia\Queue\Consumer;
1013

1114
class Swoole extends Adapter
1215
{
1316
protected Pool $pool;
1417

15-
/** @var callable */
16-
private $onStop;
17-
18-
public function __construct(Consumer $consumer, int $workerNum, string $queue, string $namespace = 'utopia-queue')
19-
{
18+
public function __construct(
19+
Consumer $consumer,
20+
int $workerNum,
21+
string $queue,
22+
string $namespace = "utopia-queue",
23+
) {
2024
parent::__construct($workerNum, $queue, $namespace);
2125

2226
$this->consumer = $consumer;
@@ -25,36 +29,15 @@ public function __construct(Consumer $consumer, int $workerNum, string $queue, s
2529

2630
public function start(): self
2731
{
28-
$this->pool->set(['enable_coroutine' => true]);
29-
30-
// Register signal handlers in the main process before starting pool
31-
if (extension_loaded('pcntl')) {
32-
pcntl_signal(SIGTERM, function () {
33-
Console::info("[Swoole] Received SIGTERM, initiating graceful shutdown...");
34-
$this->stop();
35-
});
36-
37-
pcntl_signal(SIGINT, function () {
38-
Console::info("[Swoole] Received SIGINT, initiating graceful shutdown...");
39-
$this->stop();
40-
});
41-
42-
// Enable async signals
43-
pcntl_async_signals(true);
44-
} else {
45-
Console::warning("[Swoole] pcntl extension is not loaded, worker will not shutdown gracefully.");
46-
}
32+
// Enable coroutine hooks for Redis and other extensions
33+
$this->pool->set(["enable_coroutine" => true]);
4734

4835
$this->pool->start();
4936
return $this;
5037
}
5138

5239
public function stop(): self
5340
{
54-
if ($this->onStop) {
55-
call_user_func($this->onStop);
56-
}
57-
5841
Console::info("[Swoole] Shutting down process pool...");
5942
$this->pool->shutdown();
6043
Console::success("[Swoole] Process pool stopped.");
@@ -63,33 +46,42 @@ public function stop(): self
6346

6447
public function workerStart(callable $callback): self
6548
{
66-
$this->pool->on(Constant::EVENT_WORKER_START, function (Pool $pool, string $workerId) use ($callback) {
67-
// Register signal handlers in each worker process for graceful shutdown
68-
if (extension_loaded('pcntl')) {
69-
pcntl_signal(SIGTERM, function () use ($workerId) {
70-
Console::info("[Worker] Worker {$workerId} received SIGTERM, closing consumer...");
71-
$this->consumer->close();
72-
});
73-
74-
pcntl_signal(SIGINT, function () use ($workerId) {
75-
Console::info("[Worker] Worker {$workerId} received SIGINT, closing consumer...");
76-
$this->consumer->close();
77-
});
78-
79-
pcntl_async_signals(true);
80-
}
81-
82-
call_user_func($callback, $workerId);
49+
$this->pool->on(Constant::EVENT_WORKER_START, function (
50+
Pool $pool,
51+
string $workerId,
52+
) use ($callback) {
53+
// Register signal handlers for graceful shutdown
54+
Process::signal(SIGTERM, function () use ($workerId) {
55+
Console::info(
56+
"[Swoole] Worker {$workerId} received SIGTERM, stopping consumer...",
57+
);
58+
$this->consumer->close();
59+
});
60+
61+
Process::signal(SIGINT, function () use ($workerId) {
62+
Console::info(
63+
"[Swoole] Worker {$workerId} received SIGINT, stopping consumer...",
64+
);
65+
$this->consumer->close();
66+
});
67+
68+
// Run consume loop in a coroutine to allow event loop to process signals
69+
// The coroutine container waits for all child coroutines before worker exits
70+
go(function () use ($callback, $workerId) {
71+
\call_user_func($callback, $workerId);
72+
});
8373
});
8474

8575
return $this;
8676
}
8777

8878
public function workerStop(callable $callback): self
8979
{
90-
$this->onStop = $callback;
91-
$this->pool->on(Constant::EVENT_WORKER_STOP, function (Pool $pool, string $workerId) use ($callback) {
92-
call_user_func($callback, $workerId);
80+
$this->pool->on(Constant::EVENT_WORKER_STOP, function (
81+
Pool $pool,
82+
string $workerId,
83+
) use ($callback) {
84+
\call_user_func($callback, $workerId);
9385
});
9486

9587
return $this;

src/Queue/Broker/Pool.php

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,36 @@ public function getQueueSize(Queue $queue, bool $failedJobs = false): int
3030
return $this->delegatePublish(__FUNCTION__, \func_get_args());
3131
}
3232

33-
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void
34-
{
33+
public function consume(
34+
Queue $queue,
35+
callable $messageCallback,
36+
callable $successCallback,
37+
callable $errorCallback,
38+
): void {
3539
$this->delegateConsumer(__FUNCTION__, \func_get_args());
3640
}
3741

3842
public function close(): void
3943
{
40-
$this->delegateConsumer(__FUNCTION__, \func_get_args());
44+
// TODO: Implement closing all connections in the pool
4145
}
4246

4347
protected function delegatePublish(string $method, array $args): mixed
4448
{
45-
return $this->publisher?->use(function (Publisher $adapter) use ($method, $args) {
49+
return $this->publisher?->use(function (Publisher $adapter) use (
50+
$method,
51+
$args,
52+
) {
4653
return $adapter->$method(...$args);
4754
});
4855
}
4956

5057
protected function delegateConsumer(string $method, array $args): mixed
5158
{
52-
return $this->consumer?->use(function (Consumer $adapter) use ($method, $args) {
59+
return $this->consumer?->use(function (Consumer $adapter) use (
60+
$method,
61+
$args,
62+
) {
5363
return $adapter->$method(...$args);
5464
});
5565
}

src/Queue/Broker/Redis.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
9292
public function close(): void
9393
{
9494
$this->closed = true;
95+
$this->connection->close();
9596
}
9697

9798
public function enqueue(Queue $queue, array $payload): bool

src/Queue/Connection.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ public function setArray(string $key, array $value): bool;
2525
public function increment(string $key): int;
2626
public function decrement(string $key): int;
2727
public function ping(): bool;
28+
public function close(): void;
2829
}

src/Queue/Connection/Redis.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,12 @@ public function ping(): bool
169169
}
170170
}
171171

172+
public function close(): void
173+
{
174+
$this->redis?->close();
175+
$this->redis = null;
176+
}
177+
172178
protected function getRedis(): \Redis
173179
{
174180
if ($this->redis) {

src/Queue/Connection/RedisCluster.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ public function ping(): bool
166166
}
167167
}
168168

169+
public function close(): void
170+
{
171+
$this->redis?->close();
172+
$this->redis = null;
173+
}
174+
169175
protected function getRedis(): \RedisCluster
170176
{
171177
if ($this->redis) {

0 commit comments

Comments
 (0)