Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/workflows/phpstan.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
name: "CodeQL"

on: [ pull_request ]
jobs:
lint:
name: CodeQL
runs-on: ubuntu-latest

steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 2

- run: git checkout HEAD^2

- name: Run CodeQL
run: |
docker run --rm -v $PWD:/app composer sh -c \
"composer install --profile --ignore-platform-reqs && composer check"
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 2

Expand Down
35 changes: 35 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: "Tests"

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

on: [pull_request]

jobs:
adapter_test:
name: Adapter Tests
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
adapter:
[
AMQP,
SwooleRedisCluster,
Swoole,
Workerman,
]

steps:
- name: checkout
uses: actions/checkout@v4

- name: Load and Start Services
run: |
docker compose build
docker compose up -d
sleep 10

- name: Run Tests
run: docker compose exec -T tests vendor/bin/phpunit /usr/local/src/tests/Queue/E2E/Adapter/${{matrix.adapter}}Test.php --debug
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@ COPY . .

COPY --from=composer /usr/local/src/vendor /usr/local/src/vendor

CMD ["sleep","3600"]
CMD ["tail", "-f", "/dev/null"]
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
},
"scripts":{
"test": "phpunit",
"analyse": "vendor/bin/phpstan analyse",
"check": "vendor/bin/phpstan analyse",
"format": "vendor/bin/pint",
"lint": "vendor/bin/pint --test"
},
Expand Down
22 changes: 11 additions & 11 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
version: '3.1'

services:
tests:
container_name: tests
build: .
command:
- vendor/bin/phpunit
- tests
volumes:
- ./:/usr/local/src
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
depends_on:
- swoole
- swoole-amqp
Expand All @@ -20,7 +16,8 @@ services:
build: ./tests/Queue/servers/Swoole/.
command: php /usr/src/code/tests/Queue/servers/Swoole/worker.php
volumes:
- ./:/usr/src/code
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
depends_on:
- redis

Expand All @@ -29,7 +26,8 @@ services:
build: ./tests/Queue/servers/SwooleRedisCluster/.
command: php /usr/src/code/tests/Queue/servers/SwooleRedisCluster/worker.php
volumes:
- ./:/usr/src/code
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
depends_on:
redis-cluster-0:
condition: service_healthy
Expand All @@ -39,7 +37,8 @@ services:
build: ./tests/Queue/servers/AMQP/.
command: php /usr/src/code/tests/Queue/servers/AMQP/worker.php
volumes:
- ./:/usr/src/code
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
depends_on:
amqp:
condition: service_healthy
Expand All @@ -49,7 +48,8 @@ services:
build: ./tests/Queue/servers/Workerman/.
command: php /usr/src/code/tests/Queue/servers/Workerman/worker.php start
volumes:
- ./:/usr/src/code
- ./src:/usr/local/src/src
- ./tests:/usr/local/src/tests
depends_on:
- redis

Expand Down Expand Up @@ -115,5 +115,5 @@ services:
RABBITMQ_DEFAULT_PASS: amqp
RABBITMQ_DEFAULT_VHOST: "/"
healthcheck:
test: [ "CMD", "rabbitmqctl", "node_health_check"]
test: [ "CMD", "rabbitmqctl", "node_health_check" ]
start_interval: 1s
8 changes: 4 additions & 4 deletions src/Queue/Broker/AMQP.php
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
try {
$nextMessage = json_decode($amqpMessage->getBody(), associative: true) ?? false;
if (!$nextMessage) {
$amqpMessage->nack(requeue: false);
$amqpMessage->nack();
return;
}

Expand All @@ -92,10 +92,10 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
$successCallback($message);
} catch (Retryable $e) {
$amqpMessage->nack(requeue: true);
$errorCallback($message, $e);
$errorCallback($message ?? null, $e);
} catch (\Throwable $th) {
$amqpMessage->nack(requeue: false);
$errorCallback($message, $th);
$errorCallback($message ?? null, $th);
}
};

Expand Down Expand Up @@ -149,7 +149,7 @@ public function enqueue(Queue $queue, array $payload): bool
return true;
}

public function retry(Queue $queue, int $limit = null): void
public function retry(Queue $queue, ?int $limit = null): void
{
// This is a no-op for AMQP
}
Expand Down
2 changes: 1 addition & 1 deletion src/Queue/Broker/Redis.php
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public function enqueue(Queue $queue, array $payload): bool
* Take all jobs from the failed queue and re-enqueue them.
* @param int|null $limit The amount of jobs to retry
*/
public function retry(Queue $queue, int $limit = null): void
public function retry(Queue $queue, ?int $limit = null): void
{
$start = \time();
$processed = 0;
Expand Down
5 changes: 3 additions & 2 deletions src/Queue/Connection/RedisCluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public function remove(string $key): bool

public function move(string $queue, string $destination): bool
{
return $this->getRedis()->move($queue, $destination);
// Move is not supported for Redis Cluster
return false;
}

public function setArray(string $key, array $value): bool
Expand Down Expand Up @@ -160,7 +161,7 @@ public function ping(): bool
}

return true;
} catch (Exception $e) {
} catch (\Throwable) {
return false;
}
}
Expand Down
11 changes: 8 additions & 3 deletions src/Queue/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,20 @@ interface Consumer
{
/**
* @param Queue $queue
* @param callable(Message $message): Commit|NoCommit|mixed $messageCallback
* @param callable(Message $message): (Commit|NoCommit|mixed) $messageCallback
* @param callable(Message $message): void $successCallback
* @param callable(Message $message, \Throwable $th): void $errorCallback
* @return void
*/
public function consume(Queue $queue, callable $messageCallback, callable $successCallback, callable $errorCallback): void;
public function consume(
Queue $queue,
callable $messageCallback,
callable $successCallback,
callable $errorCallback
): void;

/**
* Closes the consumer and free's any underlying resources.
* Closes the consumer and frees any underlying resources.
*/
public function close(): void;
}
2 changes: 1 addition & 1 deletion src/Queue/Publisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public function enqueue(Queue $queue, array $payload): bool;
* @param int|null $limit
* @return void
*/
public function retry(Queue $queue, int $limit = null): void;
public function retry(Queue $queue, ?int $limit = null): void;

/**
* Returns the amount of pending messages in the queue.
Expand Down
17 changes: 8 additions & 9 deletions src/Queue/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class Server

/**
* Hook that is called when worker starts
*
* @var Hook
*/
protected Hook $workerStartHook;

Expand Down Expand Up @@ -273,16 +271,16 @@ function (Message $message) {
}
}
}

Console::success("[Job] ({$message->getPid()}) successfully run.");
},
function (Message $message, Throwable $th) {
Console::error("[Job] ({$message->getPid()}) failed to run.");
Console::error("[Job] ({$message->getPid()}) {$th->getMessage()}");
function (?Message $message, Throwable $th) {
Console::error("[Job] ({$message?->getPid()}) failed to run.");
Console::error("[Job] ({$message?->getPid()}) {$th->getMessage()}");

self::setResource('error', fn () => $th);

foreach ($this->errorHooks as $hook) {
call_user_func_array($hook->getAction(), $this->getArguments($hook));
($hook->getAction())(...$this->getArguments($hook));
}
},
);
Expand Down Expand Up @@ -322,10 +320,11 @@ public function getWorkerStart(): Hook

/**
* Is called when a Worker stops.
* @param callable $callback
* @param callable|null $callback
* @return self
* @throws Exception
*/
public function workerStop(callable $callback = null): self
public function workerStop(?callable $callback = null): self
{
try {
$this->adapter->workerStop(function (string $workerId) use ($callback) {
Expand Down
21 changes: 16 additions & 5 deletions tests/Queue/E2E/Adapter/Base.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public function testEvents(): void
sleep(1);
}

protected function testConcurrency(): void
public function testConcurrency(): void
{
run(function () {
$publisher = $this->getPublisher();
Expand All @@ -93,23 +93,34 @@ public function testRetry(): void
{
$publisher = $this->getPublisher();

$publisher->enqueue($this->getQueue(), [
$published = $publisher->enqueue($this->getQueue(), [
'type' => 'test_exception',
'id' => 1
]);
$publisher->enqueue($this->getQueue(), [

$this->assertTrue($published);

$published = $publisher->enqueue($this->getQueue(), [
'type' => 'test_exception',
'id' => 2
]);
$publisher->enqueue($this->getQueue(), [

$this->assertTrue($published);

$published = $publisher->enqueue($this->getQueue(), [
'type' => 'test_exception',
'id' => 3
]);
$publisher->enqueue($this->getQueue(), [

$this->assertTrue($published);

$published = $publisher->enqueue($this->getQueue(), [
'type' => 'test_exception',
'id' => 4
]);

$this->assertTrue($published);

sleep(1);
$publisher->retry($this->getQueue());
sleep(1);
Expand Down