diff --git a/src/Adapter/AdapterInterface.php b/src/Adapter/AdapterInterface.php index 0090537..afb6d24 100644 --- a/src/Adapter/AdapterInterface.php +++ b/src/Adapter/AdapterInterface.php @@ -16,6 +16,7 @@ use Graze\Queue\Adapter\Exception\FailedAcknowledgementException; use Graze\Queue\Adapter\Exception\FailedEnqueueException; +use Graze\Queue\Adapter\Exception\UnsupportedOperationException; use Graze\Queue\Message\MessageFactoryInterface; use Graze\Queue\Message\MessageInterface; use Iterator; @@ -43,4 +44,6 @@ public function dequeue(MessageFactoryInterface $factory, $limit); * @throws FailedEnqueueException */ public function enqueue(array $messages); + + public function purge(); } diff --git a/src/Adapter/ArrayAdapter.php b/src/Adapter/ArrayAdapter.php index 9da0b40..317b393 100644 --- a/src/Adapter/ArrayAdapter.php +++ b/src/Adapter/ArrayAdapter.php @@ -22,9 +22,9 @@ final class ArrayAdapter implements AdapterInterface { /** - * @param MessageInterface[] + * @param array */ - protected $queue; + protected $queue = []; /** * @param MessageInterface[] $messages @@ -49,9 +49,15 @@ public function acknowledge(array $messages) */ public function dequeue(MessageFactoryInterface $factory, $limit) { - $total = null === $limit ? count($this->queue) : $limit; + /** + * If {@see $limit} is null then {@see LimitIterator} should be passed -1 as the count + * to avoid throwing OutOfBoundsException. + * + * @link https://github.com/php/php-src/blob/php-5.6.12/ext/spl/internal/limititerator.inc#L60-L62 + */ + $count = (null === $limit) ? -1 : $limit; - return new LimitIterator(new ArrayIterator($this->queue), 0, $total); + return new LimitIterator(new ArrayIterator($this->queue), 0, $count); } /** @@ -64,6 +70,14 @@ public function enqueue(array $messages) } } + /** + * {@inheritdoc} + */ + public function purge() + { + $this->queue = []; + } + /** * @param MessageInterface $message */ diff --git a/src/Adapter/SqsAdapter.php b/src/Adapter/SqsAdapter.php index 161b62d..49a43b8 100644 --- a/src/Adapter/SqsAdapter.php +++ b/src/Adapter/SqsAdapter.php @@ -190,6 +190,14 @@ public function enqueue(array $messages) } } + /** + * {@inheritdoc} + */ + public function purge() + { + $this->client->purgeQueue(['QueueUrl' => $this->getQueueUrl()]); + } + /** * @param MessageInterface[] $messages * @return array @@ -254,7 +262,7 @@ protected function getOption($name, $default = null) */ protected function getQueueUrl() { - if (!$this->url) { + if (! $this->url) { $result = $this->client->createQueue([ 'QueueName' => $this->name, 'Attributes' => $this->options, diff --git a/src/Client.php b/src/Client.php index 9691c97..ab9c3cb 100644 --- a/src/Client.php +++ b/src/Client.php @@ -84,6 +84,14 @@ public function send(array $messages) return $this->adapter->enqueue($messages); } + /** + * {@inheritdoc} + */ + public function purge() + { + return $this->adapter->purge(); + } + /** * @return callable */ diff --git a/src/ConsumerInterface.php b/src/ConsumerInterface.php index 5f16ce0..024cd2b 100644 --- a/src/ConsumerInterface.php +++ b/src/ConsumerInterface.php @@ -19,8 +19,8 @@ interface ConsumerInterface { /** - * @param callable $worker - * @param integer|null $limit Integer limit or Null no limit + * @param callable $worker + * @param integer $limit */ public function receive(callable $worker, $limit = 1); } diff --git a/src/ProducerInterface.php b/src/ProducerInterface.php index 2980c81..6b256c3 100644 --- a/src/ProducerInterface.php +++ b/src/ProducerInterface.php @@ -14,17 +14,21 @@ namespace Graze\Queue; +use Graze\Queue\Adapter\Exception\UnsupportedOperationException; use Graze\Queue\Message\MessageInterface; interface ProducerInterface { /** - * @return MessageInterface + * @param string $body + * @param array $options */ public function create($body, array $options = []); /** - * @param MessageInterface[] $message + * @param array $message */ public function send(array $messages); + + public function purge(); } diff --git a/tests/integration/ArrayIntegrationTest.php b/tests/integration/ArrayIntegrationTest.php index da9aa7f..fd6d77f 100644 --- a/tests/integration/ArrayIntegrationTest.php +++ b/tests/integration/ArrayIntegrationTest.php @@ -54,8 +54,44 @@ public function testReceiveWithPolling() assertThat($msgs, is(identicalTo($this->messages))); } + public function testReceiveWithNoMessages() + { + $client = new Client(new ArrayAdapter()); + + $msgs = []; + $client->receive(function ($msg) use (&$msgs) { + $msgs[] = $msg; + }, null); + + assertThat($msgs, is(emptyArray())); + } + + public function testReceiveWithLimitAndNoMessages() + { + $client = new Client(new ArrayAdapter()); + + $msgs = []; + $client->receive(function ($msg) use (&$msgs) { + $msgs[] = $msg; + }, 10); + + assertThat($msgs, is(emptyArray())); + } + public function testSend() { $this->client->send([$this->client->create('foo')]); } + + public function testPurge() + { + $this->client->purge(); + + $msgs = []; + $this->client->receive(function ($msg) use (&$msgs) { + $msgs[] = $msg; + }, null); + + assertThat($msgs, is(emptyArray())); + } } diff --git a/tests/integration/SqsIntegrationTest.php b/tests/integration/SqsIntegrationTest.php index d422862..0c201a4 100644 --- a/tests/integration/SqsIntegrationTest.php +++ b/tests/integration/SqsIntegrationTest.php @@ -202,4 +202,33 @@ public function testSend() $this->client->send([$this->client->create('foo')]); } + + public function testPurge() + { + $url = $this->stubCreateQueue(); + $timeout = $this->stubQueueVisibilityTimeout($url); + + $receiveModel = m::mock('Aws\ResultInterface'); + $receiveModel->shouldReceive('get')->once()->with('Messages')->andReturn([]); + $this->sqsClient->shouldReceive('receiveMessage')->once()->with([ + 'QueueUrl' => $url, + 'AttributeNames' => ['All'], + 'MaxNumberOfMessages' => 1, + 'VisibilityTimeout' => $timeout + ])->andReturn($receiveModel); + + $purgeModel = m::mock('Aws\ResultInterface'); + $this->sqsClient->shouldReceive('purgeQueue')->once()->with([ + 'QueueUrl' => $url, + ])->andReturn($purgeModel); + + $this->client->purge(); + + $msgs = []; + $this->client->receive(function ($msg) use (&$msgs) { + $msgs[] = $msg; + }); + + assertThat($msgs, is(emptyArray())); + } } diff --git a/tests/unit/Adapter/ArrayAdapterTest.php b/tests/unit/Adapter/ArrayAdapterTest.php index fff2363..71f1c7f 100644 --- a/tests/unit/Adapter/ArrayAdapterTest.php +++ b/tests/unit/Adapter/ArrayAdapterTest.php @@ -75,6 +75,24 @@ public function testDequeueWithPollingLimit() assertThat(iterator_to_array($iterator), is(identicalTo($this->messages))); } + public function testDequeueWithNoMessages() + { + $adapter = new ArrayAdapter(); + + $iterator = $adapter->dequeue($this->factory, null); + + assertThat(iterator_to_array($iterator), is(emptyArray())); + } + + public function testDequeueWithLimitAndNoMessages() + { + $adapter = new ArrayAdapter(); + + $iterator = $adapter->dequeue($this->factory, 10); + + assertThat(iterator_to_array($iterator), is(emptyArray())); + } + public function testEnqueue() { $messageA = m::mock('Graze\Queue\Message\MessageInterface'); @@ -89,4 +107,17 @@ public function testEnqueue() assertThat(iterator_to_array($iterator), is(identicalTo($merged))); } + + public function testPurge() + { + $iterator = $this->adapter->dequeue($this->factory, 10); + + assertThat(iterator_to_array($iterator), is(nonEmptyArray())); + + $this->adapter->purge(); + + $iterator = $this->adapter->dequeue($this->factory, 10); + + assertThat(iterator_to_array($iterator), is(emptyArray())); + } } diff --git a/tests/unit/Adapter/SqsAdapterTest.php b/tests/unit/Adapter/SqsAdapterTest.php index a15eb62..4e9fd03 100644 --- a/tests/unit/Adapter/SqsAdapterTest.php +++ b/tests/unit/Adapter/SqsAdapterTest.php @@ -220,4 +220,16 @@ public function testReceiveMessageWaitTimeSecondsOption() assertThat($iterator, is(anInstanceOf('Generator'))); assertThat(iterator_to_array($iterator), is(equalTo($this->messages))); } + + public function testPurge() + { + $adapter = new SqsAdapter($this->client, 'foo'); + $url = $this->stubCreateQueue('foo'); + + $this->client->shouldReceive('purgeQueue')->once()->with([ + 'QueueUrl' => $url, + ])->andReturn($this->model); + + assertThat($adapter->purge(), is(nullValue())); + } }