Skip to content

Commit 263d617

Browse files
committed
feat(batching): add support for message batching to AMQP adapter
1 parent b713b99 commit 263d617

File tree

5 files changed

+27
-4
lines changed

5 files changed

+27
-4
lines changed

src/Queue/Broker/AMQP.php

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
use Utopia\Queue\Message;
1515
use Utopia\Queue\Publisher;
1616
use Utopia\Queue\Queue;
17+
use Utopia\Queue\Result\Commit;
18+
use Utopia\Queue\Result\NoCommit;
1719

1820
class AMQP implements Publisher, Consumer
1921
{
@@ -81,8 +83,12 @@ public function consume(Queue $queue, callable $messageCallback, callable $succe
8183
$nextMessage['timestamp'] = (int)$nextMessage['timestamp'];
8284
$message = new Message($nextMessage);
8385

84-
$messageCallback($message);
85-
$amqpMessage->ack();
86+
$result = $messageCallback($message);
87+
match(true) {
88+
$result instanceof Commit => $amqpMessage->ack(true),
89+
$result instanceof NoCommit => null,
90+
default => $amqpMessage->ack()
91+
};
8692
$successCallback($message);
8793
} catch (Retryable $e) {
8894
$amqpMessage->nack(requeue: true);

src/Queue/Consumer.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
namespace Utopia\Queue;
44

5+
use Utopia\Queue\Result\Commit;
6+
use Utopia\Queue\Result\NoCommit;
7+
58
interface Consumer
69
{
710
/**
811
* @param Queue $queue
9-
* @param callable(Message $message): void $messageCallback
12+
* @param callable(Message $message): Commit|NoCommit|mixed $messageCallback
1013
* @param callable(Message $message): void $successCallback
1114
* @param callable(Message $message, \Throwable $th): void $errorCallback
1215
* @return void

src/Queue/Result/Commit.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace Utopia\Queue\Result;
4+
5+
class Commit
6+
{
7+
}

src/Queue/Result/NoCommit.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace Utopia\Queue\Result;
4+
5+
class NoCommit
6+
{
7+
}

src/Queue/Server.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ function (Message $message) {
249249
}
250250
}
251251

252-
\call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
252+
return \call_user_func_array($this->job->getAction(), $this->getArguments($this->job, $message->getPayload()));
253253
} finally {
254254
$processDuration = microtime(true) - $receivedAtTimestamp;
255255
$this->processDuration->record($processDuration);

0 commit comments

Comments
 (0)