33namespace G4 \Tasker \Tasker2 ;
44
55use G4 \Tasker \TaskAbstract ;
6+ use G4 \Tasker \Tasker2 \Exception \TasksRelocatedToPersistenceException ;
67use G4 \Tasker \Tasker2 \Queue \BatchPublisher ;
78use PhpAmqpLib \Connection \AMQPStreamConnection ;
89use G4 \ValueObject \Uuid ;
@@ -34,6 +35,11 @@ class TaskQueue
3435 */
3536 private $ requestUuid ;
3637
38+ /**
39+ * @var \G4\Log\ErrorLogger
40+ */
41+ private $ errorLogger ;
42+
3743 public function __construct (
3844 \G4 \Tasker \Queue $ queue ,
3945 AMQPStreamConnection $ AMQPConnection = null ,
@@ -48,6 +54,12 @@ public function __construct(
4854 $ this ->requestUuid = $ requestUuid ;
4955 }
5056
57+ public function setErrorLogger (\G4 \Log \ErrorLogger $ logger )
58+ {
59+ $ this ->errorLogger = $ logger ;
60+ return $ this ;
61+ }
62+
5163 public function add (\G4 \Tasker \TaskAbstract $ task )
5264 {
5365 $ this ->tasks [] = clone $ task ;
@@ -99,8 +111,7 @@ private function saveCurrentTasks($tasks)
99111
100112 if ($ this ->AMQPConnection === null ) {
101113 // in case that rabbitmq is not available save tasks to database
102- $ this ->saveDelayedTasks ($ tasks );
103- trigger_error ('RabbitMQ connection is not available for Tasker TaskQueue ' , E_USER_NOTICE );
114+ $ this ->delayCurrentTasks ($ tasks );
104115 return $ this ;
105116 }
106117
@@ -118,6 +129,18 @@ private function saveCurrentTasks($tasks)
118129 return $ this ;
119130 }
120131
132+ private function delayCurrentTasks ($ tasks )
133+ {
134+ $ tasksModified = array_map (function (TaskAbstract $ task ) {
135+ return $ task ->relocateToPersistence ();
136+ }, $ tasks );
137+
138+ $ this ->saveDelayedTasks ($ tasksModified );
139+ $ this ->errorLogger !== null
140+ && $ this ->errorLogger ->log (new TasksRelocatedToPersistenceException (count ($ tasksModified ))
141+ );
142+ }
143+
121144 private function getRequestUuid ()
122145 {
123146 return $ this ->requestUuid !== null
0 commit comments