diff --git a/composer.json b/composer.json index 43ca961..3d2dcb3 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,7 @@ "psr-4": {"Utopia\\Queue\\": "src/Queue"} }, "autoload-dev": { - "psr-4": {"Utopia\\Tests\\": "tests/Database"} + "psr-4": {"Tests\\E2E\\": "tests/Queue/E2E"} }, "scripts":{ "test": "phpunit", diff --git a/composer.lock b/composer.lock index df6c65f..50cd1b6 100644 --- a/composer.lock +++ b/composer.lock @@ -57,25 +57,26 @@ }, { "name": "utopia-php/framework", - "version": "0.28.1", + "version": "0.32.0", "source": { "type": "git", "url": "https://github.com/utopia-php/framework.git", - "reference": "7f22c556fc5991e54e5811a68fb39809b21bda55" + "reference": "ad6f7e6d6b38cf5bed4e3af9a1394c59d4bb9225" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/framework/zipball/7f22c556fc5991e54e5811a68fb39809b21bda55", - "reference": "7f22c556fc5991e54e5811a68fb39809b21bda55", + "url": "https://api.github.com/repos/utopia-php/framework/zipball/ad6f7e6d6b38cf5bed4e3af9a1394c59d4bb9225", + "reference": "ad6f7e6d6b38cf5bed4e3af9a1394c59d4bb9225", "shasum": "" }, "require": { - "php": ">=8.0.0" + "php": ">=8.0" }, "require-dev": { "laravel/pint": "^1.2", - "phpunit/phpunit": "^9.5.25", - "vimeo/psalm": "4.27.0" + "phpbench/phpbench": "^1.2", + "phpstan/phpstan": "^1.10", + "phpunit/phpunit": "^9.5.25" }, "type": "library", "autoload": { @@ -95,9 +96,9 @@ ], "support": { "issues": "https://github.com/utopia-php/framework/issues", - "source": "https://github.com/utopia-php/framework/tree/0.28.1" + "source": "https://github.com/utopia-php/framework/tree/0.32.0" }, - "time": "2023-03-02T08:16:01+00:00" + "time": "2023-12-26T14:18:36+00:00" } ], "packages-dev": [ @@ -239,16 +240,16 @@ }, { "name": "myclabs/deep-copy", - "version": "1.11.0", + "version": "1.11.1", "source": { "type": "git", "url": "https://github.com/myclabs/DeepCopy.git", - "reference": "14daed4296fae74d9e3201d2c4925d1acb7aa614" + "reference": "7284c22080590fb39f2ffa3e9057f10a4ddd0e0c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/14daed4296fae74d9e3201d2c4925d1acb7aa614", - "reference": "14daed4296fae74d9e3201d2c4925d1acb7aa614", + "url": "https://api.github.com/repos/myclabs/DeepCopy/zipball/7284c22080590fb39f2ffa3e9057f10a4ddd0e0c", + "reference": "7284c22080590fb39f2ffa3e9057f10a4ddd0e0c", "shasum": "" }, "require": { @@ -286,7 +287,7 @@ ], "support": { "issues": "https://github.com/myclabs/DeepCopy/issues", - "source": "https://github.com/myclabs/DeepCopy/tree/1.11.0" + "source": "https://github.com/myclabs/DeepCopy/tree/1.11.1" }, "funding": [ { @@ -294,20 +295,20 @@ "type": "tidelift" } ], - "time": "2022-03-03T13:19:32+00:00" + "time": "2023-03-08T13:26:56+00:00" }, { "name": "nikic/php-parser", - "version": "v4.15.3", + "version": "v4.18.0", "source": { "type": "git", "url": "https://github.com/nikic/PHP-Parser.git", - "reference": "570e980a201d8ed0236b0a62ddf2c9cbb2034039" + "reference": "1bcbb2179f97633e98bbbc87044ee2611c7d7999" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/570e980a201d8ed0236b0a62ddf2c9cbb2034039", - "reference": "570e980a201d8ed0236b0a62ddf2c9cbb2034039", + "url": "https://api.github.com/repos/nikic/PHP-Parser/zipball/1bcbb2179f97633e98bbbc87044ee2611c7d7999", + "reference": "1bcbb2179f97633e98bbbc87044ee2611c7d7999", "shasum": "" }, "require": { @@ -348,9 +349,9 @@ ], "support": { "issues": "https://github.com/nikic/PHP-Parser/issues", - "source": "https://github.com/nikic/PHP-Parser/tree/v4.15.3" + "source": "https://github.com/nikic/PHP-Parser/tree/v4.18.0" }, - "time": "2023-01-16T22:05:37+00:00" + "time": "2023-12-10T21:03:43+00:00" }, { "name": "phar-io/manifest", @@ -465,16 +466,16 @@ }, { "name": "phpstan/phpstan", - "version": "1.10.3", + "version": "1.10.50", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "5419375b5891add97dc74be71e6c1c34baaddf64" + "reference": "06a98513ac72c03e8366b5a0cb00750b487032e4" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/5419375b5891add97dc74be71e6c1c34baaddf64", - "reference": "5419375b5891add97dc74be71e6c1c34baaddf64", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/06a98513ac72c03e8366b5a0cb00750b487032e4", + "reference": "06a98513ac72c03e8366b5a0cb00750b487032e4", "shasum": "" }, "require": { @@ -503,8 +504,11 @@ "static analysis" ], "support": { + "docs": "https://phpstan.org/user-guide/getting-started", + "forum": "https://github.com/phpstan/phpstan/discussions", "issues": "https://github.com/phpstan/phpstan/issues", - "source": "https://github.com/phpstan/phpstan/tree/1.10.3" + "security": "https://github.com/phpstan/phpstan/security/policy", + "source": "https://github.com/phpstan/phpstan-src" }, "funding": [ { @@ -520,27 +524,27 @@ "type": "tidelift" } ], - "time": "2023-02-25T14:47:13+00:00" + "time": "2023-12-13T10:59:42+00:00" }, { "name": "phpunit/php-code-coverage", - "version": "9.2.25", + "version": "9.2.30", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/php-code-coverage.git", - "reference": "0e2b40518197a8c0d4b08bc34dfff1c99c508954" + "reference": "ca2bd87d2f9215904682a9cb9bb37dda98e76089" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/0e2b40518197a8c0d4b08bc34dfff1c99c508954", - "reference": "0e2b40518197a8c0d4b08bc34dfff1c99c508954", + "url": "https://api.github.com/repos/sebastianbergmann/php-code-coverage/zipball/ca2bd87d2f9215904682a9cb9bb37dda98e76089", + "reference": "ca2bd87d2f9215904682a9cb9bb37dda98e76089", "shasum": "" }, "require": { "ext-dom": "*", "ext-libxml": "*", "ext-xmlwriter": "*", - "nikic/php-parser": "^4.15", + "nikic/php-parser": "^4.18 || ^5.0", "php": ">=7.3", "phpunit/php-file-iterator": "^3.0.3", "phpunit/php-text-template": "^2.0.2", @@ -555,8 +559,8 @@ "phpunit/phpunit": "^9.3" }, "suggest": { - "ext-pcov": "*", - "ext-xdebug": "*" + "ext-pcov": "PHP extension that provides line coverage", + "ext-xdebug": "PHP extension that provides line coverage as well as branch and path coverage" }, "type": "library", "extra": { @@ -589,7 +593,8 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/php-code-coverage/issues", - "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.25" + "security": "https://github.com/sebastianbergmann/php-code-coverage/security/policy", + "source": "https://github.com/sebastianbergmann/php-code-coverage/tree/9.2.30" }, "funding": [ { @@ -597,7 +602,7 @@ "type": "github" } ], - "time": "2023-02-25T05:32:00+00:00" + "time": "2023-12-22T06:47:57+00:00" }, { "name": "phpunit/php-file-iterator", @@ -842,16 +847,16 @@ }, { "name": "phpunit/phpunit", - "version": "9.6.4", + "version": "9.6.15", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/phpunit.git", - "reference": "9125ee085b6d95e78277dc07aa1f46f9e0607b8d" + "reference": "05017b80304e0eb3f31d90194a563fd53a6021f1" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/9125ee085b6d95e78277dc07aa1f46f9e0607b8d", - "reference": "9125ee085b6d95e78277dc07aa1f46f9e0607b8d", + "url": "https://api.github.com/repos/sebastianbergmann/phpunit/zipball/05017b80304e0eb3f31d90194a563fd53a6021f1", + "reference": "05017b80304e0eb3f31d90194a563fd53a6021f1", "shasum": "" }, "require": { @@ -866,7 +871,7 @@ "phar-io/manifest": "^2.0.3", "phar-io/version": "^3.0.2", "php": ">=7.3", - "phpunit/php-code-coverage": "^9.2.13", + "phpunit/php-code-coverage": "^9.2.28", "phpunit/php-file-iterator": "^3.0.5", "phpunit/php-invoker": "^3.1.1", "phpunit/php-text-template": "^2.0.3", @@ -884,8 +889,8 @@ "sebastian/version": "^3.0.2" }, "suggest": { - "ext-soap": "*", - "ext-xdebug": "*" + "ext-soap": "To be able to generate mocks based on WSDL files", + "ext-xdebug": "PHP extension that provides line coverage as well as branch and path coverage" }, "bin": [ "phpunit" @@ -924,7 +929,8 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/phpunit/issues", - "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.4" + "security": "https://github.com/sebastianbergmann/phpunit/security/policy", + "source": "https://github.com/sebastianbergmann/phpunit/tree/9.6.15" }, "funding": [ { @@ -940,7 +946,7 @@ "type": "tidelift" } ], - "time": "2023-02-27T13:06:37+00:00" + "time": "2023-12-01T16:55:19+00:00" }, { "name": "sebastian/cli-parser", @@ -1185,20 +1191,20 @@ }, { "name": "sebastian/complexity", - "version": "2.0.2", + "version": "2.0.3", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/complexity.git", - "reference": "739b35e53379900cc9ac327b2147867b8b6efd88" + "reference": "25f207c40d62b8b7aa32f5ab026c53561964053a" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/complexity/zipball/739b35e53379900cc9ac327b2147867b8b6efd88", - "reference": "739b35e53379900cc9ac327b2147867b8b6efd88", + "url": "https://api.github.com/repos/sebastianbergmann/complexity/zipball/25f207c40d62b8b7aa32f5ab026c53561964053a", + "reference": "25f207c40d62b8b7aa32f5ab026c53561964053a", "shasum": "" }, "require": { - "nikic/php-parser": "^4.7", + "nikic/php-parser": "^4.18 || ^5.0", "php": ">=7.3" }, "require-dev": { @@ -1230,7 +1236,7 @@ "homepage": "https://github.com/sebastianbergmann/complexity", "support": { "issues": "https://github.com/sebastianbergmann/complexity/issues", - "source": "https://github.com/sebastianbergmann/complexity/tree/2.0.2" + "source": "https://github.com/sebastianbergmann/complexity/tree/2.0.3" }, "funding": [ { @@ -1238,20 +1244,20 @@ "type": "github" } ], - "time": "2020-10-26T15:52:27+00:00" + "time": "2023-12-22T06:19:30+00:00" }, { "name": "sebastian/diff", - "version": "4.0.4", + "version": "4.0.5", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/diff.git", - "reference": "3461e3fccc7cfdfc2720be910d3bd73c69be590d" + "reference": "74be17022044ebaaecfdf0c5cd504fc9cd5a7131" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/diff/zipball/3461e3fccc7cfdfc2720be910d3bd73c69be590d", - "reference": "3461e3fccc7cfdfc2720be910d3bd73c69be590d", + "url": "https://api.github.com/repos/sebastianbergmann/diff/zipball/74be17022044ebaaecfdf0c5cd504fc9cd5a7131", + "reference": "74be17022044ebaaecfdf0c5cd504fc9cd5a7131", "shasum": "" }, "require": { @@ -1296,7 +1302,7 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/diff/issues", - "source": "https://github.com/sebastianbergmann/diff/tree/4.0.4" + "source": "https://github.com/sebastianbergmann/diff/tree/4.0.5" }, "funding": [ { @@ -1304,7 +1310,7 @@ "type": "github" } ], - "time": "2020-10-26T13:10:38+00:00" + "time": "2023-05-07T05:35:17+00:00" }, { "name": "sebastian/environment", @@ -1448,16 +1454,16 @@ }, { "name": "sebastian/global-state", - "version": "5.0.5", + "version": "5.0.6", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/global-state.git", - "reference": "0ca8db5a5fc9c8646244e629625ac486fa286bf2" + "reference": "bde739e7565280bda77be70044ac1047bc007e34" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/global-state/zipball/0ca8db5a5fc9c8646244e629625ac486fa286bf2", - "reference": "0ca8db5a5fc9c8646244e629625ac486fa286bf2", + "url": "https://api.github.com/repos/sebastianbergmann/global-state/zipball/bde739e7565280bda77be70044ac1047bc007e34", + "reference": "bde739e7565280bda77be70044ac1047bc007e34", "shasum": "" }, "require": { @@ -1500,7 +1506,7 @@ ], "support": { "issues": "https://github.com/sebastianbergmann/global-state/issues", - "source": "https://github.com/sebastianbergmann/global-state/tree/5.0.5" + "source": "https://github.com/sebastianbergmann/global-state/tree/5.0.6" }, "funding": [ { @@ -1508,24 +1514,24 @@ "type": "github" } ], - "time": "2022-02-14T08:28:10+00:00" + "time": "2023-08-02T09:26:13+00:00" }, { "name": "sebastian/lines-of-code", - "version": "1.0.3", + "version": "1.0.4", "source": { "type": "git", "url": "https://github.com/sebastianbergmann/lines-of-code.git", - "reference": "c1c2e997aa3146983ed888ad08b15470a2e22ecc" + "reference": "e1e4a170560925c26d424b6a03aed157e7dcc5c5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/sebastianbergmann/lines-of-code/zipball/c1c2e997aa3146983ed888ad08b15470a2e22ecc", - "reference": "c1c2e997aa3146983ed888ad08b15470a2e22ecc", + "url": "https://api.github.com/repos/sebastianbergmann/lines-of-code/zipball/e1e4a170560925c26d424b6a03aed157e7dcc5c5", + "reference": "e1e4a170560925c26d424b6a03aed157e7dcc5c5", "shasum": "" }, "require": { - "nikic/php-parser": "^4.6", + "nikic/php-parser": "^4.18 || ^5.0", "php": ">=7.3" }, "require-dev": { @@ -1557,7 +1563,7 @@ "homepage": "https://github.com/sebastianbergmann/lines-of-code", "support": { "issues": "https://github.com/sebastianbergmann/lines-of-code/issues", - "source": "https://github.com/sebastianbergmann/lines-of-code/tree/1.0.3" + "source": "https://github.com/sebastianbergmann/lines-of-code/tree/1.0.4" }, "funding": [ { @@ -1565,7 +1571,7 @@ "type": "github" } ], - "time": "2020-11-28T06:42:11+00:00" + "time": "2023-12-22T06:20:34+00:00" }, { "name": "sebastian/object-enumerator", @@ -1950,16 +1956,16 @@ }, { "name": "theseer/tokenizer", - "version": "1.2.1", + "version": "1.2.2", "source": { "type": "git", "url": "https://github.com/theseer/tokenizer.git", - "reference": "34a41e998c2183e22995f158c581e7b5e755ab9e" + "reference": "b2ad5003ca10d4ee50a12da31de12a5774ba6b96" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/theseer/tokenizer/zipball/34a41e998c2183e22995f158c581e7b5e755ab9e", - "reference": "34a41e998c2183e22995f158c581e7b5e755ab9e", + "url": "https://api.github.com/repos/theseer/tokenizer/zipball/b2ad5003ca10d4ee50a12da31de12a5774ba6b96", + "reference": "b2ad5003ca10d4ee50a12da31de12a5774ba6b96", "shasum": "" }, "require": { @@ -1988,7 +1994,7 @@ "description": "A small library for converting tokenized PHP source code into XML and potentially other formats", "support": { "issues": "https://github.com/theseer/tokenizer/issues", - "source": "https://github.com/theseer/tokenizer/tree/1.2.1" + "source": "https://github.com/theseer/tokenizer/tree/1.2.2" }, "funding": [ { @@ -1996,20 +2002,20 @@ "type": "github" } ], - "time": "2021-07-28T10:34:58+00:00" + "time": "2023-11-20T00:12:19+00:00" }, { "name": "workerman/workerman", - "version": "v4.1.8", + "version": "v4.1.14", "source": { "type": "git", "url": "https://github.com/walkor/workerman.git", - "reference": "0df2093b09296f07f81e4cdfbe0582f3dfa183f5" + "reference": "f7c9667c7b5387c01fa9e50ee79ed931e93ee76e" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/walkor/workerman/zipball/0df2093b09296f07f81e4cdfbe0582f3dfa183f5", - "reference": "0df2093b09296f07f81e4cdfbe0582f3dfa183f5", + "url": "https://api.github.com/repos/walkor/workerman/zipball/f7c9667c7b5387c01fa9e50ee79ed931e93ee76e", + "reference": "f7c9667c7b5387c01fa9e50ee79ed931e93ee76e", "shasum": "" }, "require": { @@ -2059,7 +2065,7 @@ "type": "patreon" } ], - "time": "2023-02-19T03:27:11+00:00" + "time": "2023-08-09T03:37:45+00:00" } ], "aliases": [], @@ -2071,5 +2077,5 @@ "php": ">=8.0" }, "platform-dev": [], - "plugin-api-version": "2.2.0" + "plugin-api-version": "2.6.0" } diff --git a/phpunit.xml b/phpunit.xml index b0e7e34..1b8f40d 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -10,7 +10,7 @@ > - ./tests/Queue/e2e/ + ./tests/Queue/E2E/Adapter \ No newline at end of file diff --git a/src/Queue/Client.php b/src/Queue/Client.php index 967b6be..ac55bf8 100644 --- a/src/Queue/Client.php +++ b/src/Queue/Client.php @@ -26,14 +26,55 @@ public function enqueue(array $payload): bool return $this->connection->leftPushArray("{$this->namespace}.queue.{$this->queue}", $payload); } + /** + * Take all jobs from the failed queue and re-enqueue them. + * @param int|null $limit The amount of jobs to retry + */ + public function retry(int $limit = null): void + { + $start = \time(); + $processed = 0; + + while (true) { + $pid = $this->connection->rightPop("{$this->namespace}.failed.{$this->queue}", 5); + + // No more jobs to retry + if ($pid === false) { + break; + } + + $job = $this->getJob($pid); + + // Job doesn't exist + if ($job === false) { + break; + } + + // Job was already retried + if ($job->getTimestamp() >= $start) { + break; + } + + // We're reached the max amount of jobs to retry + if ($limit !== null && $processed >= $limit) { + break; + } + + $this->enqueue($job->getPayload()); + $processed++; + } + } + public function getJob(string $pid): Message|false { - $job = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}"); + $value = $this->connection->get("{$this->namespace}.jobs.{$this->queue}.{$pid}"); - if ($job === false) { + if ($value === false) { return false; } + $job = json_decode($value, true); + return new Message($job); } @@ -47,22 +88,22 @@ public function getQueueSize(): int return $this->connection->listSize("{$this->namespace}.queue.{$this->queue}"); } - public function sumTotalJobs(): int + public function countTotalJobs(): int { return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.total") ?? 0); } - public function sumSuccessfulJobs(): int + public function countSuccessfulJobs(): int { return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.success") ?? 0); } - public function sumFailedJobs(): int + public function countFailedJobs(): int { return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.failed") ?? 0); } - public function sumProcessingJobs(): int + public function countProcessingJobs(): int { return (int)($this->connection->get("{$this->namespace}.stats.{$this->queue}.processing") ?? 0); } diff --git a/src/Queue/Connection/Redis.php b/src/Queue/Connection/Redis.php index 68bc0ae..4536bb3 100644 --- a/src/Queue/Connection/Redis.php +++ b/src/Queue/Connection/Redis.php @@ -3,7 +3,6 @@ namespace Utopia\Queue\Connection; use Utopia\Queue\Connection; -use Utopia\Queue\Message; class Redis implements Connection { @@ -69,7 +68,7 @@ public function rightPopArray(string $queue, int $timeout): array|false return false; } - return json_decode($response, true); + return json_decode($response, true) ?? false; } public function rightPop(string $queue, int $timeout): string|false @@ -91,7 +90,7 @@ public function leftPopArray(string $queue, int $timeout): array|false return false; } - return json_decode($response[1], true); + return json_decode($response[1], true) ?? false; } public function leftPop(string $queue, int $timeout): string|false @@ -152,11 +151,11 @@ public function decrement(string $key): int public function listRange(string $key, int $total, int $offset): array { - $start = $offset - 1; - $end = ($total + $offset) -1; - $results = $this->getRedis()->lrange($key, $start, $end); + $start = $offset; + $end = $start + $total - 1; + $results = $this->getRedis()->lRange($key, $start, $end); - return array_map(fn (array $job) => new Message($job), $results); + return $results; } public function ping(): bool diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php new file mode 100644 index 0000000..ab5b6f1 --- /dev/null +++ b/tests/Queue/E2E/Adapter/Base.php @@ -0,0 +1,155 @@ +payloads = []; + $this->payloads[] = [ + 'type' => 'test_string', + 'value' => 'lorem ipsum' + ]; + $this->payloads[] = [ + 'type' => 'test_number', + 'value' => 123 + ]; + $this->payloads[] = [ + 'type' => 'test_number', + 'value' => 123.456 + ]; + $this->payloads[] = [ + 'type' => 'test_bool', + 'value' => true + ]; + $this->payloads[] = [ + 'type' => 'test_null', + 'value' => null + ]; + $this->payloads[] = [ + 'type' => 'test_array', + 'value' => [ + 1, + 2, + 3 + ] + ]; + $this->payloads[] = [ + 'type' => 'test_assoc', + 'value' => [ + 'string' => 'ipsum', + 'number' => 123, + 'bool' => true, + 'null' => null + ] + ]; + } + + /** + * @return Client + */ + abstract protected function getClient(): Client; + + public function testEvents(): void + { + $client = $this->getClient(); + $client->resetStats(); + + foreach ($this->payloads as $payload) { + $this->assertTrue($client->enqueue($payload)); + } + + sleep(1); + + $this->assertEquals(7, $client->countTotalJobs()); + $this->assertEquals(0, $client->getQueueSize()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(0, $client->countFailedJobs()); + $this->assertEquals(7, $client->countSuccessfulJobs()); + } + + protected function testConcurrency(): void + { + run(function () { + $client = $this->getClient(); + go(function () use ($client) { + $client->resetStats(); + + foreach ($this->payloads as $payload) { + $this->assertTrue($client->enqueue($payload)); + } + + sleep(1); + + $this->assertEquals(7, $client->countTotalJobs()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(0, $client->countFailedJobs()); + $this->assertEquals(7, $client->countSuccessfulJobs()); + }); + }); + } + + /** + * @depends testEvents + */ + public function testRetry(): void + { + $client = $this->getClient(); + $client->resetStats(); + + $client->enqueue([ + 'type' => 'test_exception', + 'id' => 1 + ]); + $client->enqueue([ + 'type' => 'test_exception', + 'id' => 2 + ]); + $client->enqueue([ + 'type' => 'test_exception', + 'id' => 3 + ]); + $client->enqueue([ + 'type' => 'test_exception', + 'id' => 4 + ]); + + sleep(1); + + $this->assertEquals(4, $client->countTotalJobs()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(4, $client->countFailedJobs()); + $this->assertEquals(0, $client->countSuccessfulJobs()); + + $client->resetStats(); + + $client->retry(); + + sleep(1); + + // Retry will retry ALL failed jobs regardless of if they are still tracked in stats + $this->assertEquals(4, $client->countTotalJobs()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(4, $client->countFailedJobs()); + $this->assertEquals(0, $client->countSuccessfulJobs()); + + $client->resetStats(); + + $client->retry(2); + + sleep(1); + + $this->assertEquals(2, $client->countTotalJobs()); + $this->assertEquals(0, $client->countProcessingJobs()); + $this->assertEquals(2, $client->countFailedJobs()); + $this->assertEquals(0, $client->countSuccessfulJobs()); + } +} diff --git a/tests/Queue/E2E/Adapter/SwooleTest.php b/tests/Queue/E2E/Adapter/SwooleTest.php new file mode 100644 index 0000000..e940c98 --- /dev/null +++ b/tests/Queue/E2E/Adapter/SwooleTest.php @@ -0,0 +1,17 @@ +payloads = []; - $this->payloads[] = [ - 'type' => 'test_string', - 'value' => 'lorem ipsum' - ]; - $this->payloads[] = [ - 'type' => 'test_number', - 'value' => 123 - ]; - $this->payloads[] = [ - 'type' => 'test_number', - 'value' => 123.456 - ]; - $this->payloads[] = [ - 'type' => 'test_bool', - 'value' => true - ]; - $this->payloads[] = [ - 'type' => 'test_null', - 'value' => null - ]; - $this->payloads[] = [ - 'type' => 'test_array', - 'value' => [ - 1, - 2, - 3 - ] - ]; - $this->payloads[] = [ - 'type' => 'test_assoc', - 'value' => [ - 'string' => 'ipsum', - 'number' => 123, - 'bool' => true, - 'null' => null - ] - ]; - $this->payloads[] = [ - 'type' => 'test_exception' - ]; - } - - public function testEvents(): void - { - $connection = new Redis('redis', 6379); - - $this->assertTrue($connection->ping()); - - $client = new Client('workerman', $connection); - $client->resetStats(); - - - foreach ($this->payloads as $payload) { - $this->assertTrue($client->enqueue($payload)); - } - - sleep(1); - - $this->assertEquals(8, $client->sumTotalJobs()); - $this->assertEquals(0, $client->getQueueSize()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(1, $client->sumFailedJobs()); - $this->assertEquals(7, $client->sumSuccessfulJobs()); - } - - public function testSwoole(): void - { - $connection = new Redis('redis', 6379); - - run(function () use ($connection) { - $client = new Client('swoole', $connection); - go(function () use ($client) { - $client->resetStats(); - - foreach ($this->payloads as $payload) { - $this->assertTrue($client->enqueue($payload)); - } - - sleep(1); - - $this->assertEquals(8, $client->sumTotalJobs()); - $this->assertEquals(0, $client->sumProcessingJobs()); - $this->assertEquals(1, $client->sumFailedJobs()); - $this->assertEquals(7, $client->sumSuccessfulJobs()); - }); - }); - } -}