Skip to content

Commit 9175652

Browse files
Merge pull request #17 from utopia-php/feat-retry
Implement Retry into Queue Lib
2 parents 0120bd2 + f42207a commit 9175652

File tree

9 files changed

+333
-204
lines changed

9 files changed

+333
-204
lines changed

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"psr-4": {"Utopia\\Queue\\": "src/Queue"}
1616
},
1717
"autoload-dev": {
18-
"psr-4": {"Utopia\\Tests\\": "tests/Database"}
18+
"psr-4": {"Tests\\E2E\\": "tests/Queue/E2E"}
1919
},
2020
"scripts":{
2121
"test": "phpunit",

composer.lock

Lines changed: 89 additions & 83 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

phpunit.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
>
1111
<testsuites>
1212
<testsuite name="E2E">
13-
<directory>./tests/Queue/e2e/</directory>
13+
<directory>./tests/Queue/E2E/Adapter</directory>
1414
</testsuite>
1515
</testsuites>
1616
</phpunit>

src/Queue/Client.php

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,55 @@ public function enqueue(array $payload): bool
2626
return $this->connection->leftPushArray("{$this->namespace}.queue.{$this->queue}", $payload);
2727
}
2828

29+
/**
30+
* Take all jobs from the failed queue and re-enqueue them.
31+
* @param int|null $limit The amount of jobs to retry
32+
*/
33+
public function retry(int $limit = null): void
34+
{
35+
$start = \time();
36+
$processed = 0;
37+
38+
while (true) {
39+
$pid = $this->connection->rightPop("{$this->namespace}.failed.{$this->queue}", 5);
40+
41+
// No more jobs to retry
42+
if ($pid === false) {
43+
break;
44+
}
45+
46+
$job = $this->getJob($pid);
47+
48+
// Job doesn't exist
49+
if ($job === false) {
50+
break;
51+
}
52+
53+
// Job was already retried
54+
if ($job->getTimestamp() >= $start) {
55+
break;
56+
}
57+
58+
// We're reached the max amount of jobs to retry
59+
if ($limit !== null && $processed >= $limit) {
60+
break;
61+
}
62+
63+
$this->enqueue($job->getPayload());
64+
$processed++;
65+
}
66+
}
67+
2968
public function getJob(string $pid): Message|false
3069
{
31-
$job = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}");
70+
$value = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}");
3271

33-
if ($job === false) {
72+
if ($value === false) {
3473
return false;
3574
}
3675

76+
$job = json_decode($value, true);
77+
3778
return new Message($job);
3879
}
3980

@@ -47,22 +88,22 @@ public function getQueueSize(): int
4788
return $this->connection->listSize("{$this->namespace}.queue.{$this->queue}");
4889
}
4990

50-
public function sumTotalJobs(): int
91+
public function countTotalJobs(): int
5192
{
5293
return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.total") ?? 0);
5394
}
5495

55-
public function sumSuccessfulJobs(): int
96+
public function countSuccessfulJobs(): int
5697
{
5798
return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.success") ?? 0);
5899
}
59100

60-
public function sumFailedJobs(): int
101+
public function countFailedJobs(): int
61102
{
62103
return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.failed") ?? 0);
63104
}
64105

65-
public function sumProcessingJobs(): int
106+
public function countProcessingJobs(): int
66107
{
67108
return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.processing") ?? 0);
68109
}

src/Queue/Connection/Redis.php

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace Utopia\Queue\Connection;
44

55
use Utopia\Queue\Connection;
6-
use Utopia\Queue\Message;
76

87
class Redis implements Connection
98
{
@@ -69,7 +68,7 @@ public function rightPopArray(string $queue, int $timeout): array|false
6968
return false;
7069
}
7170

72-
return json_decode($response, true);
71+
return json_decode($response, true) ?? false;
7372
}
7473

7574
public function rightPop(string $queue, int $timeout): string|false
@@ -91,7 +90,7 @@ public function leftPopArray(string $queue, int $timeout): array|false
9190
return false;
9291
}
9392

94-
return json_decode($response[1], true);
93+
return json_decode($response[1], true) ?? false;
9594
}
9695

9796
public function leftPop(string $queue, int $timeout): string|false
@@ -152,11 +151,11 @@ public function decrement(string $key): int
152151

153152
public function listRange(string $key, int $total, int $offset): array
154153
{
155-
$start = $offset - 1;
156-
$end = ($total + $offset) -1;
157-
$results = $this->getRedis()->lrange($key, $start, $end);
154+
$start = $offset;
155+
$end = $start + $total - 1;
156+
$results = $this->getRedis()->lRange($key, $start, $end);
158157

159-
return array_map(fn (array $job) => new Message($job), $results);
158+
return $results;
160159
}
161160

162161
public function ping(): bool

tests/Queue/E2E/Adapter/Base.php

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
<?php
2+
3+
namespace Tests\E2E\Adapter;
4+
5+
use PHPUnit\Framework\TestCase;
6+
use Utopia\Queue\Client;
7+
8+
use function Co\run;
9+
10+
abstract class Base extends TestCase
11+
{
12+
protected array $payloads;
13+
14+
public function setUp(): void
15+
{
16+
$this->payloads = [];
17+
$this->payloads[] = [
18+
'type' => 'test_string',
19+
'value' => 'lorem ipsum'
20+
];
21+
$this->payloads[] = [
22+
'type' => 'test_number',
23+
'value' => 123
24+
];
25+
$this->payloads[] = [
26+
'type' => 'test_number',
27+
'value' => 123.456
28+
];
29+
$this->payloads[] = [
30+
'type' => 'test_bool',
31+
'value' => true
32+
];
33+
$this->payloads[] = [
34+
'type' => 'test_null',
35+
'value' => null
36+
];
37+
$this->payloads[] = [
38+
'type' => 'test_array',
39+
'value' => [
40+
1,
41+
2,
42+
3
43+
]
44+
];
45+
$this->payloads[] = [
46+
'type' => 'test_assoc',
47+
'value' => [
48+
'string' => 'ipsum',
49+
'number' => 123,
50+
'bool' => true,
51+
'null' => null
52+
]
53+
];
54+
}
55+
56+
/**
57+
* @return Client
58+
*/
59+
abstract protected function getClient(): Client;
60+
61+
public function testEvents(): void
62+
{
63+
$client = $this->getClient();
64+
$client->resetStats();
65+
66+
foreach ($this->payloads as $payload) {
67+
$this->assertTrue($client->enqueue($payload));
68+
}
69+
70+
sleep(1);
71+
72+
$this->assertEquals(7, $client->countTotalJobs());
73+
$this->assertEquals(0, $client->getQueueSize());
74+
$this->assertEquals(0, $client->countProcessingJobs());
75+
$this->assertEquals(0, $client->countFailedJobs());
76+
$this->assertEquals(7, $client->countSuccessfulJobs());
77+
}
78+
79+
protected function testConcurrency(): void
80+
{
81+
run(function () {
82+
$client = $this->getClient();
83+
go(function () use ($client) {
84+
$client->resetStats();
85+
86+
foreach ($this->payloads as $payload) {
87+
$this->assertTrue($client->enqueue($payload));
88+
}
89+
90+
sleep(1);
91+
92+
$this->assertEquals(7, $client->countTotalJobs());
93+
$this->assertEquals(0, $client->countProcessingJobs());
94+
$this->assertEquals(0, $client->countFailedJobs());
95+
$this->assertEquals(7, $client->countSuccessfulJobs());
96+
});
97+
});
98+
}
99+
100+
/**
101+
* @depends testEvents
102+
*/
103+
public function testRetry(): void
104+
{
105+
$client = $this->getClient();
106+
$client->resetStats();
107+
108+
$client->enqueue([
109+
'type' => 'test_exception',
110+
'id' => 1
111+
]);
112+
$client->enqueue([
113+
'type' => 'test_exception',
114+
'id' => 2
115+
]);
116+
$client->enqueue([
117+
'type' => 'test_exception',
118+
'id' => 3
119+
]);
120+
$client->enqueue([
121+
'type' => 'test_exception',
122+
'id' => 4
123+
]);
124+
125+
sleep(1);
126+
127+
$this->assertEquals(4, $client->countTotalJobs());
128+
$this->assertEquals(0, $client->countProcessingJobs());
129+
$this->assertEquals(4, $client->countFailedJobs());
130+
$this->assertEquals(0, $client->countSuccessfulJobs());
131+
132+
$client->resetStats();
133+
134+
$client->retry();
135+
136+
sleep(1);
137+
138+
// Retry will retry ALL failed jobs regardless of if they are still tracked in stats
139+
$this->assertEquals(4, $client->countTotalJobs());
140+
$this->assertEquals(0, $client->countProcessingJobs());
141+
$this->assertEquals(4, $client->countFailedJobs());
142+
$this->assertEquals(0, $client->countSuccessfulJobs());
143+
144+
$client->resetStats();
145+
146+
$client->retry(2);
147+
148+
sleep(1);
149+
150+
$this->assertEquals(2, $client->countTotalJobs());
151+
$this->assertEquals(0, $client->countProcessingJobs());
152+
$this->assertEquals(2, $client->countFailedJobs());
153+
$this->assertEquals(0, $client->countSuccessfulJobs());
154+
}
155+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
namespace Tests\E2E\Adapter;
4+
5+
use Utopia\Queue\Client;
6+
use Utopia\Queue\Connection\Redis;
7+
8+
class SwooleTest extends Base
9+
{
10+
protected function getClient(): Client
11+
{
12+
$connection = new Redis('redis', 6379);
13+
$client = new Client('swoole', $connection);
14+
15+
return $client;
16+
}
17+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?php
2+
3+
namespace Tests\E2E\Adapter;
4+
5+
use Utopia\Queue\Client;
6+
use Utopia\Queue\Connection\Redis;
7+
8+
class WorkermanTest extends Base
9+
{
10+
protected function getClient(): Client
11+
{
12+
$connection = new Redis('redis', 6379);
13+
$client = new Client('workerman', $connection);
14+
15+
return $client;
16+
}
17+
}

0 commit comments

Comments
 (0)