diff --git a/composer.json b/composer.json index 8b6f64b1..96428e2f 100644 --- a/composer.json +++ b/composer.json @@ -9,7 +9,8 @@ "php": ">=7.4", "beberlei/assert": "^2.1 || ^3.0", "bernard/normalt": "^1.0", - "symfony/event-dispatcher": "^3.0 || ^4.0" + "symfony/event-dispatcher": "^3.0 || ^4.0", + "ext-pcntl": "*" }, "require-dev" : { "doctrine/dbal": "^2.5", diff --git a/src/Consumer.php b/src/Consumer.php index 2fefc087..522613ce 100644 --- a/src/Consumer.php +++ b/src/Consumer.php @@ -36,15 +36,14 @@ public function __construct(Router $router, EventDispatcherInterface $dispatcher * * @param Queue $queue * @param array $options + * @throws \Throwable */ public function consume(Queue $queue, array $options = []) { - declare(ticks=1); - $this->bind(); while ($this->tick($queue, $options)) { - // NO op + pcntl_signal_dispatch(); } } @@ -56,6 +55,7 @@ public function consume(Queue $queue, array $options = []) * @param array $options * * @return bool + * @throws \Throwable */ public function tick(Queue $queue, array $options = []) { @@ -134,15 +134,19 @@ public function invoke(Envelope $envelope, Queue $queue) $queue->acknowledge($envelope); $this->dispatcher->dispatch(BernardEvents::ACKNOWLEDGE, new EnvelopeEvent($envelope, $queue)); + } catch (\Throwable $error) { + // php 7 $this->rejectDispatch($error, $envelope, $queue); } catch (\Exception $exception) { + // php 5 $this->rejectDispatch($exception, $envelope, $queue); } } /** * @param array $options + * @return array */ protected function configure(array $options) { @@ -153,6 +157,8 @@ protected function configure(array $options) $this->options = array_filter($options) + $this->options; $this->options['max-runtime'] += microtime(true); $this->configured = true; + + return $this->options; } /** diff --git a/src/Driver/FlatFile/Driver.php b/src/Driver/FlatFile/Driver.php index afd91e0c..d4b7a83f 100644 --- a/src/Driver/FlatFile/Driver.php +++ b/src/Driver/FlatFile/Driver.php @@ -109,7 +109,14 @@ public function popMessage($queueName, $duration = 5) $files = $this->getJobFiles($queueName); } - usleep(1000); + $nano = time_nanosleep(0, 1000000); + if ($nano === false) { + // fallback when time_nanosleep fails + usleep(1000); + } elseif (is_array($nano)) { + // Interrupted by a pcntl_signal + return array(null, null); + } } return [null, null]; diff --git a/tests/Driver/FlatFile/DriverTest.php b/tests/Driver/FlatFile/DriverTest.php index da15d41e..16810ebe 100644 --- a/tests/Driver/FlatFile/DriverTest.php +++ b/tests/Driver/FlatFile/DriverTest.php @@ -14,6 +14,8 @@ class DriverTest extends \PHPUnit\Framework\TestCase */ private $driver; + private $baseDir; + protected function setUp(): void { $this->baseDir = sys_get_temp_dir().\DIRECTORY_SEPARATOR.'bernard-flat'; @@ -49,7 +51,7 @@ public function testRemove() $this->driver->removeQueue('send-newsletter'); - $this->assertDirectoryNotExists($this->baseDir.\DIRECTORY_SEPARATOR.'send-newsletter'); + $this->assertDirectoryDoesNotExist($this->baseDir.\DIRECTORY_SEPARATOR.'send-newsletter'); } public function testRemoveQueueWithPoppedMessage() @@ -60,7 +62,7 @@ public function testRemoveQueueWithPoppedMessage() $this->driver->removeQueue('send-newsletter'); - $this->assertDirectoryNotExists($this->baseDir.\DIRECTORY_SEPARATOR.'send-newsletter'); + $this->assertDirectoryDoesNotExist($this->baseDir.\DIRECTORY_SEPARATOR.'send-newsletter'); } public function testPushMessage()