33import json
44import logging
55import time
6- from typing import Any , Awaitable , Callable , Dict , List , Optional
6+ from typing import Awaitable , Callable , Dict , List , Optional
77import uuid
8- from nats_queue .nats_limiter import FixedWindowLimiter , IntervalLimiter , Limiter
8+ from nats_queue .nats_limiter import FixedWindowLimiter , IntervalLimiter
99from nats .js .client import JetStreamContext
1010from nats .aio .client import Client
1111from nats .aio .msg import Msg
@@ -107,7 +107,7 @@ async def _process_task(self, job: Msg):
107107 delay = int (planned_time .total_seconds ())
108108 await job .nak (delay = delay )
109109 logger .debug (
110- f"Job: { job_data ["name" ]} id={ job_data ["id" ]} is scheduled for later. "
110+ f"Job: { job_data ["name" ]} id={ job_data ["id" ]} is scheduled later"
111111 f"Requeueing in { delay } seconds."
112112 )
113113 return
@@ -120,7 +120,8 @@ async def _process_task(self, job: Msg):
120120 return
121121
122122 logger .info (
123- f'Job: { job_data ["name" ]} id={ job_data ["id" ]} is started with data={ job_data ["data" ]} ) in queue={ job_data ["queue_name" ]} '
123+ f"""Job: { job_data ["name" ]} id={ job_data ["id" ]} is started
124+ with data={ job_data ["data" ]} ) in queue={ job_data ["queue_name" ]} """
124125 )
125126
126127 timeout = job_data ["meta" ]["timeout" ]
@@ -160,17 +161,20 @@ async def fetch_messages(
160161 try :
161162 msgs = await sub .fetch (count , timeout = self .fetch_timeout )
162163 logger .debug (
163- f"Consumer: name={ (await sub .consumer_info ()).name } fetched { len (msgs )} messages"
164+ f"""Consumer: name={ (await sub .consumer_info ()).name }
165+ fetched { len (msgs )} messages"""
164166 )
165167 return msgs
166168 except TimeoutError :
167169 logger .debug (
168- f"Consumer: name={ (await sub .consumer_info ()).name } failed to fetch messages: TimeoutError"
170+ f"""Consumer: name={ (await sub .consumer_info ()).name }
171+ failed to fetch messages: TimeoutError"""
169172 )
170173 return []
171174 except Exception as e :
172175 logger .error (
173- f"Consumer: name={ (await sub .consumer_info ()).name } error while fetching messages: { e } "
176+ f"""Consumer: name={ (await sub .consumer_info ()).name }
177+ error while fetching messages: { e } """
174178 )
175179 raise
176180
@@ -183,12 +187,14 @@ async def get_subscriptions(self) -> List[JetStreamContext.PullSubscription]:
183187 topic , durable = f"worker_group_{ priority } "
184188 )
185189 logger .info (
186- f"Consumer: name={ self .name } successfully subscribed to topic { topic } ."
190+ f"""Consumer: name={ self .name }
191+ successfully subscribed to topic { topic } ."""
187192 )
188193 subscriptions .append (sub )
189194 except Exception as e :
190195 logger .error (
191- f"Consumer: name={ self .name } error while subscribing to topic { topic } : { e } "
196+ f"""Consumer: name={ self .name } error
197+ while subscribing to topic { topic } : { e } """
192198 )
193199 raise
194200
0 commit comments