Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/Adapter/AdapterInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use Graze\Queue\Adapter\Exception\FailedAcknowledgementException;
use Graze\Queue\Adapter\Exception\FailedEnqueueException;
use Graze\Queue\Adapter\Exception\UnsupportedOperationException;
use Graze\Queue\Message\MessageFactoryInterface;
use Graze\Queue\Message\MessageInterface;
use Iterator;
Expand Down Expand Up @@ -43,4 +44,6 @@ public function dequeue(MessageFactoryInterface $factory, $limit);
* @throws FailedEnqueueException
*/
public function enqueue(array $messages);

public function purge();
}
22 changes: 18 additions & 4 deletions src/Adapter/ArrayAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
final class ArrayAdapter implements AdapterInterface
{
/**
* @param MessageInterface[]
* @param array
*/
protected $queue;
protected $queue = [];

/**
* @param MessageInterface[] $messages
Expand All @@ -49,9 +49,15 @@ public function acknowledge(array $messages)
*/
public function dequeue(MessageFactoryInterface $factory, $limit)
{
$total = null === $limit ? count($this->queue) : $limit;
/**
* If {@see $limit} is null then {@see LimitIterator} should be passed -1 as the count
* to avoid throwing OutOfBoundsException.
*
* @link https://github.com/php/php-src/blob/php-5.6.12/ext/spl/internal/limititerator.inc#L60-L62
*/
$count = (null === $limit) ? -1 : $limit;

return new LimitIterator(new ArrayIterator($this->queue), 0, $total);
return new LimitIterator(new ArrayIterator($this->queue), 0, $count);
}

/**
Expand All @@ -64,6 +70,14 @@ public function enqueue(array $messages)
}
}

/**
* {@inheritdoc}
*/
public function purge()
{
$this->queue = [];
}

/**
* @param MessageInterface $message
*/
Expand Down
10 changes: 9 additions & 1 deletion src/Adapter/SqsAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@ public function enqueue(array $messages)
}
}

/**
* {@inheritdoc}
*/
public function purge()
{
$this->client->purgeQueue(['QueueUrl' => $this->getQueueUrl()]);
}

/**
* @param MessageInterface[] $messages
* @return array
Expand Down Expand Up @@ -254,7 +262,7 @@ protected function getOption($name, $default = null)
*/
protected function getQueueUrl()
{
if (!$this->url) {
if (! $this->url) {
$result = $this->client->createQueue([
'QueueName' => $this->name,
'Attributes' => $this->options,
Expand Down
8 changes: 8 additions & 0 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ public function send(array $messages)
return $this->adapter->enqueue($messages);
}

/**
* {@inheritdoc}
*/
public function purge()
{
return $this->adapter->purge();
}

/**
* @return callable
*/
Expand Down
4 changes: 2 additions & 2 deletions src/ConsumerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
interface ConsumerInterface
{
/**
* @param callable $worker
* @param integer|null $limit Integer limit or Null no limit
* @param callable $worker
* @param integer $limit
*/
public function receive(callable $worker, $limit = 1);
}
8 changes: 6 additions & 2 deletions src/ProducerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,21 @@

namespace Graze\Queue;

use Graze\Queue\Adapter\Exception\UnsupportedOperationException;
use Graze\Queue\Message\MessageInterface;

interface ProducerInterface
{
/**
* @return MessageInterface
* @param string $body
* @param array $options
*/
public function create($body, array $options = []);

/**
* @param MessageInterface[] $message
* @param array $message
*/
public function send(array $messages);

public function purge();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the talk today, any thoughts on this @wpillar?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are my thoughts:

  • Client implements ProducerInterface and ClientInterface, so adding purge() to ProducerInterface does not unreasonably affect that class.
  • You've already got segregated interfaces in ProducerInterface and ClientInterface and since Client implements both, the only consideration really is how people might extend this code. Are we burdening people who might write separate ProducerInterface implementations with a method they can't implement? My thinking here is if we could reasonably assume that all queue implementations have a way of purging then it's fine to put this on ProducerInterface, as a purge operation is similar to a send operation. If we think this assumption is unreasonable then you can segregate further with a PurgeableInterface.

What's your thinking on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking here is if we could reasonably assume that all queue implementations have a way of purging then it's fine to put this on ProducerInterface, as a purge operation is similar to a send operation.

So far it's 👌, but I doubt every queue implementation supports purging.

We're not 1.0.0, so I'm going to go with this and see what Mat thinks if he makes use of it.

}
36 changes: 36 additions & 0 deletions tests/integration/ArrayIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,44 @@ public function testReceiveWithPolling()
assertThat($msgs, is(identicalTo($this->messages)));
}

public function testReceiveWithNoMessages()
{
$client = new Client(new ArrayAdapter());

$msgs = [];
$client->receive(function ($msg) use (&$msgs) {
$msgs[] = $msg;
}, null);

assertThat($msgs, is(emptyArray()));
}

public function testReceiveWithLimitAndNoMessages()
{
$client = new Client(new ArrayAdapter());

$msgs = [];
$client->receive(function ($msg) use (&$msgs) {
$msgs[] = $msg;
}, 10);

assertThat($msgs, is(emptyArray()));
}

public function testSend()
{
$this->client->send([$this->client->create('foo')]);
}

public function testPurge()
{
$this->client->purge();

$msgs = [];
$this->client->receive(function ($msg) use (&$msgs) {
$msgs[] = $msg;
}, null);

assertThat($msgs, is(emptyArray()));
}
}
29 changes: 29 additions & 0 deletions tests/integration/SqsIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,33 @@ public function testSend()

$this->client->send([$this->client->create('foo')]);
}

public function testPurge()
{
$url = $this->stubCreateQueue();
$timeout = $this->stubQueueVisibilityTimeout($url);

$receiveModel = m::mock('Aws\ResultInterface');
$receiveModel->shouldReceive('get')->once()->with('Messages')->andReturn([]);
$this->sqsClient->shouldReceive('receiveMessage')->once()->with([
'QueueUrl' => $url,
'AttributeNames' => ['All'],
'MaxNumberOfMessages' => 1,
'VisibilityTimeout' => $timeout
])->andReturn($receiveModel);

$purgeModel = m::mock('Aws\ResultInterface');
$this->sqsClient->shouldReceive('purgeQueue')->once()->with([
'QueueUrl' => $url,
])->andReturn($purgeModel);

$this->client->purge();

$msgs = [];
$this->client->receive(function ($msg) use (&$msgs) {
$msgs[] = $msg;
});

assertThat($msgs, is(emptyArray()));
}
}
31 changes: 31 additions & 0 deletions tests/unit/Adapter/ArrayAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ public function testDequeueWithPollingLimit()
assertThat(iterator_to_array($iterator), is(identicalTo($this->messages)));
}

public function testDequeueWithNoMessages()
{
$adapter = new ArrayAdapter();

$iterator = $adapter->dequeue($this->factory, null);

assertThat(iterator_to_array($iterator), is(emptyArray()));
}

public function testDequeueWithLimitAndNoMessages()
{
$adapter = new ArrayAdapter();

$iterator = $adapter->dequeue($this->factory, 10);

assertThat(iterator_to_array($iterator), is(emptyArray()));
}

public function testEnqueue()
{
$messageA = m::mock('Graze\Queue\Message\MessageInterface');
Expand All @@ -89,4 +107,17 @@ public function testEnqueue()

assertThat(iterator_to_array($iterator), is(identicalTo($merged)));
}

public function testPurge()
{
$iterator = $this->adapter->dequeue($this->factory, 10);

assertThat(iterator_to_array($iterator), is(nonEmptyArray()));

$this->adapter->purge();

$iterator = $this->adapter->dequeue($this->factory, 10);

assertThat(iterator_to_array($iterator), is(emptyArray()));
}
}
12 changes: 12 additions & 0 deletions tests/unit/Adapter/SqsAdapterTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,16 @@ public function testReceiveMessageWaitTimeSecondsOption()
assertThat($iterator, is(anInstanceOf('Generator')));
assertThat(iterator_to_array($iterator), is(equalTo($this->messages)));
}

public function testPurge()
{
$adapter = new SqsAdapter($this->client, 'foo');
$url = $this->stubCreateQueue('foo');

$this->client->shouldReceive('purgeQueue')->once()->with([
'QueueUrl' => $url,
])->andReturn($this->model);

assertThat($adapter->purge(), is(nullValue()));
}
}