Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"psr-4": {"Utopia\\Queue\\": "src/Queue"}
},
"autoload-dev": {
"psr-4": {"Utopia\\Tests\\": "tests/Database"}
"psr-4": {"Tests\\E2E\\": "tests/Queue/E2E"}
},
"scripts":{
"test": "phpunit",
Expand Down
172 changes: 89 additions & 83 deletions composer.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion phpunit.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
>
<testsuites>
<testsuite name="E2E">
<directory>./tests/Queue/e2e/</directory>
<directory>./tests/Queue/E2E/Adapter</directory>
</testsuite>
</testsuites>
</phpunit>
53 changes: 47 additions & 6 deletions src/Queue/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,55 @@ public function enqueue(array $payload): bool
return $this->connection->leftPushArray("{$this->namespace}.queue.{$this->queue}", $payload);
}

/**
* Take all jobs from the failed queue and re-enqueue them.
* @param int|null $limit The amount of jobs to retry
*/
public function retry(int $limit = null): void
{
$start = \time();
$processed = 0;

while (true) {
$pid = $this->connection->rightPop("{$this->namespace}.failed.{$this->queue}", 5);

// No more jobs to retry
if ($pid === false) {
break;
}

$job = $this->getJob($pid);

// Job doesn't exist
if ($job === false) {
break;
}

// Job was already retried
if ($job->getTimestamp() >= $start) {
break;
}

// We're reached the max amount of jobs to retry
if ($limit !== null && $processed >= $limit) {
break;
}

$this->enqueue($job->getPayload());
$processed++;
}
}

public function getJob(string $pid): Message|false
{
$job = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}");
$value = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}");

if ($job === false) {
if ($value === false) {
return false;
}

$job = json_decode($value, true);

return new Message($job);
}

Expand All @@ -47,22 +88,22 @@ public function getQueueSize(): int
return $this->connection->listSize("{$this->namespace}.queue.{$this->queue}");
}

public function sumTotalJobs(): int
public function countTotalJobs(): int
{
return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.total") ?? 0);
}

public function sumSuccessfulJobs(): int
public function countSuccessfulJobs(): int
{
return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.success") ?? 0);
}

public function sumFailedJobs(): int
public function countFailedJobs(): int
{
return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.failed") ?? 0);
}

public function sumProcessingJobs(): int
public function countProcessingJobs(): int
{
return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.processing") ?? 0);
}
Expand Down
13 changes: 6 additions & 7 deletions src/Queue/Connection/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
namespace Utopia\Queue\Connection;

use Utopia\Queue\Connection;
use Utopia\Queue\Message;

class Redis implements Connection
{
Expand Down Expand Up @@ -69,7 +68,7 @@ public function rightPopArray(string $queue, int $timeout): array|false
return false;
}

return json_decode($response, true);
return json_decode($response, true) ?? false;
}

public function rightPop(string $queue, int $timeout): string|false
Expand All @@ -91,7 +90,7 @@ public function leftPopArray(string $queue, int $timeout): array|false
return false;
}

return json_decode($response[1], true);
return json_decode($response[1], true) ?? false;
}

public function leftPop(string $queue, int $timeout): string|false
Expand Down Expand Up @@ -152,11 +151,11 @@ public function decrement(string $key): int

public function listRange(string $key, int $total, int $offset): array
{
$start = $offset - 1;
$end = ($total + $offset) -1;
$results = $this->getRedis()->lrange($key, $start, $end);
$start = $offset;
$end = $start + $total - 1;
$results = $this->getRedis()->lRange($key, $start, $end);

return array_map(fn (array $job) => new Message($job), $results);
return $results;
}

public function ping(): bool
Expand Down
155 changes: 155 additions & 0 deletions tests/Queue/E2E/Adapter/Base.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
<?php

namespace Tests\E2E\Adapter;

use PHPUnit\Framework\TestCase;
use Utopia\Queue\Client;

use function Co\run;

abstract class Base extends TestCase
{
protected array $payloads;

public function setUp(): void
{
$this->payloads = [];
$this->payloads[] = [
'type' => 'test_string',
'value' => 'lorem ipsum'
];
$this->payloads[] = [
'type' => 'test_number',
'value' => 123
];
$this->payloads[] = [
'type' => 'test_number',
'value' => 123.456
];
$this->payloads[] = [
'type' => 'test_bool',
'value' => true
];
$this->payloads[] = [
'type' => 'test_null',
'value' => null
];
$this->payloads[] = [
'type' => 'test_array',
'value' => [
1,
2,
3
]
];
$this->payloads[] = [
'type' => 'test_assoc',
'value' => [
'string' => 'ipsum',
'number' => 123,
'bool' => true,
'null' => null
]
];
}

/**
* @return Client
*/
abstract protected function getClient(): Client;

public function testEvents(): void
{
$client = $this->getClient();
$client->resetStats();

foreach ($this->payloads as $payload) {
$this->assertTrue($client->enqueue($payload));
}

sleep(1);

$this->assertEquals(7, $client->countTotalJobs());
$this->assertEquals(0, $client->getQueueSize());
$this->assertEquals(0, $client->countProcessingJobs());
$this->assertEquals(0, $client->countFailedJobs());
$this->assertEquals(7, $client->countSuccessfulJobs());
}

protected function testConcurrency(): void
{
run(function () {
$client = $this->getClient();
go(function () use ($client) {
$client->resetStats();

foreach ($this->payloads as $payload) {
$this->assertTrue($client->enqueue($payload));
}

sleep(1);

$this->assertEquals(7, $client->countTotalJobs());
$this->assertEquals(0, $client->countProcessingJobs());
$this->assertEquals(0, $client->countFailedJobs());
$this->assertEquals(7, $client->countSuccessfulJobs());
});
});
}

/**
* @depends testEvents
*/
public function testRetry(): void
{
$client = $this->getClient();
$client->resetStats();

$client->enqueue([
'type' => 'test_exception',
'id' => 1
]);
$client->enqueue([
'type' => 'test_exception',
'id' => 2
]);
$client->enqueue([
'type' => 'test_exception',
'id' => 3
]);
$client->enqueue([
'type' => 'test_exception',
'id' => 4
]);

sleep(1);

$this->assertEquals(4, $client->countTotalJobs());
$this->assertEquals(0, $client->countProcessingJobs());
$this->assertEquals(4, $client->countFailedJobs());
$this->assertEquals(0, $client->countSuccessfulJobs());

$client->resetStats();

$client->retry();

sleep(1);

// Retry will retry ALL failed jobs regardless of if they are still tracked in stats
$this->assertEquals(4, $client->countTotalJobs());
$this->assertEquals(0, $client->countProcessingJobs());
$this->assertEquals(4, $client->countFailedJobs());
$this->assertEquals(0, $client->countSuccessfulJobs());

$client->resetStats();

$client->retry(2);

sleep(1);

$this->assertEquals(2, $client->countTotalJobs());
$this->assertEquals(0, $client->countProcessingJobs());
$this->assertEquals(2, $client->countFailedJobs());
$this->assertEquals(0, $client->countSuccessfulJobs());
}
}
17 changes: 17 additions & 0 deletions tests/Queue/E2E/Adapter/SwooleTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Tests\E2E\Adapter;

use Utopia\Queue\Client;
use Utopia\Queue\Connection\Redis;

class SwooleTest extends Base
{
protected function getClient(): Client
{
$connection = new Redis('redis', 6379);
$client = new Client('swoole', $connection);

return $client;
}
}
17 changes: 17 additions & 0 deletions tests/Queue/E2E/Adapter/WorkermanTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Tests\E2E\Adapter;

use Utopia\Queue\Client;
use Utopia\Queue\Connection\Redis;

class WorkermanTest extends Base
{
protected function getClient(): Client
{
$connection = new Redis('redis', 6379);
$client = new Client('workerman', $connection);

return $client;
}
}
Loading