Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "nats_queue"
version = "1.1.1"
version = "1.1.3"
description = ""
authors = ["Kristina Shishkina <k.shishkina@darwinsoft.ru>"]
readme = "README.md"
Expand Down
4 changes: 2 additions & 2 deletions src/nats_queue/nats_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(
data: Dict[str, Any] = {},
timeout=None,
delay=0,
meta=None,
meta={},
):
for param, param_name in [
(queue_name, "queue_name"),
Expand All @@ -25,7 +25,7 @@ def __init__(
self.queue_name = queue_name
self.name = name
self.data = data
self.meta = meta or {
self.meta = meta | {
"retry_count": 0,
"start_time": (datetime.now() + timedelta(seconds=delay)).isoformat(),
"timeout": timeout,
Expand Down
36 changes: 36 additions & 0 deletions src/nats_queue/nats_queue.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import logging
from typing import Dict, List, Optional, Union
from nats.aio.client import Client
from nats.js.kv import KeyValue
from nats.js.errors import BadRequestError
from nats.errors import ConnectionClosedError
import json
Expand Down Expand Up @@ -36,6 +38,7 @@ def __init__(
self.client = client
self.manager = None
self.duplicate_window = duplicate_window
self.kv: Optional[KeyValue] = None
self.logger: Logger = logger

self.logger.info(
Expand All @@ -53,6 +56,11 @@ async def setup(self):
duplicate_window=self.duplicate_window,
)
self.logger.info(f"Stream '{self.name}' created successfully.")

self.kv = await self.manager.create_key_value(
bucket=f"{self.name}_parent_id"
)

except BadRequestError:
self.logger.warning(
f"Stream '{self.name}' already exists. Attempting to update"
Expand Down Expand Up @@ -106,3 +114,31 @@ async def addJobs(self, jobs: list[Job], priority: int = 1):

for job in jobs:
await self.addJob(job, priority)

async def addFlowJob(
self, tree: Dict[str, Union[List[Job], Job]], priority: int = 1
):
async def traverse(node: Dict[str, Union[List[Job], Job]], parent_id=None):
current_job: Job = node["job"]
if parent_id:
current_job.meta["parent_id"] = parent_id

children = node.get("children", [])
if not children:
return [current_job]

await self.kv.put(
current_job.id,
json.dumps(
{**current_job.to_dict(), "children_count": len(children)}
).encode(),
)

deepest_jobs = []
for child in children:
deepest_jobs.extend(await traverse(child, current_job.id))

return deepest_jobs

deepest_jobs = await traverse(tree)
await self.addJobs(deepest_jobs, priority)
81 changes: 67 additions & 14 deletions src/nats_queue/nats_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from nats_queue.nats_limiter import FixedWindowLimiter, IntervalLimiter
from nats.js.client import JetStreamContext
from nats.aio.client import Client
from nats.js.kv import KeyValue
from nats.aio.msg import Msg
from nats_queue.nats_job import Job
from nats.errors import TimeoutError

logger = logging.getLogger("nats_queue")
Expand Down Expand Up @@ -56,6 +56,7 @@ def __init__(
self.processing_now: int = 0
self.loop_task: Optional[asyncio.Task] = None
self.logger: Logger = logger
self.kv: Optional[KeyValue] = None

self.logger.info(
(
Expand All @@ -69,6 +70,7 @@ async def setup(self):
try:
self.manager = self.client.jetstream()
self.consumers = await self.get_subscriptions()
self.kv = await self.manager.key_value(f"{self.name}_parent_id")
except Exception as e:
raise e

Expand All @@ -91,22 +93,60 @@ async def loop(self):
while self.running:
for consumer in self.consumers:
max_jobs = self.limiter.get(self.concurrency - self.processing_now)
if max_jobs == 0:
if max_jobs <= 0:
continue
jobs = await self.fetch_messages(consumer, max_jobs)
if jobs:
break
else:
jobs = []

for job in jobs:
self.limiter.inc()
asyncio.create_task(self._process_task(job))

await asyncio.sleep(self.limiter.timeout())

async def _mark_parents_failed(self, job_data: dict):
parent_id = job_data["meta"].get("parent_id")
if not parent_id:
return

parent_job = await self.kv.get(parent_id)
if not parent_job:
self.logger.warning(
f"Parent job with ID {parent_id} not found in KV store."
)
return

parent_job_data = json.loads(parent_job.value.decode())

parent_job_data["meta"]["failed"] = True
await self._publish_parent_job(parent_job_data)
await self._mark_parents_failed(parent_job_data)

async def _publish_parent_job(self, parent_job_data):
subject = f"{parent_job_data['queue_name']}.{parent_job_data['name']}.1"
job_bytes = json.dumps(parent_job_data).encode()
await self.manager.publish(
subject, job_bytes, headers={"Nats-Msg-Id": parent_job_data["id"]}
)
self.logger.info(
f"Parent Job id={parent_job_data['id']} "
f"subject={subject} added successfully"
)

async def _process_task(self, job: Msg):
try:
self.processing_now += 1
job_data = json.loads(job.data.decode())
if job_data["meta"].get("faild"):
await job.term()
self.logger.warning(
f"Job: {job_data['name']} id={job_data['id']} failed because "
f"child job did not complete successfully "
)

job_start_time = datetime.fromisoformat(job_data["meta"]["start_time"])
if job_start_time > datetime.now():
planned_time = job_start_time - datetime.now()
Expand All @@ -124,8 +164,11 @@ async def _process_task(self, job: Msg):
if job_data.get("meta").get("retry_count") > self.max_retries:
await job.term()
self.logger.warning(
f"Job: {job_data['name']} id={job_data['id']} max retries exceeded"
f"Job: {job_data['name']} id={job_data['id']} "
f"failed max retries exceeded"
)

await self._mark_parents_failed(job_data)
return

self.logger.info(
Expand All @@ -143,27 +186,37 @@ async def _process_task(self, job: Msg):
f'Job: {job_data["name"]} id={job_data["id"]} is completed'
)

parent_id = job_data["meta"].get("parent_id")
if parent_id:
parent_job_data = json.loads(
(await self.kv.get(parent_id)).value.decode()
)
parent_job_data["children_count"] -= 1
await self.kv.put(parent_id, json.dumps(parent_job_data).encode())
if parent_job_data["children_count"] == 0:
await self.kv.delete(parent_id)
await self._publish_parent_job(parent_job_data)

except Exception as e:
if isinstance(e, asyncio.TimeoutError):
self.logger.error(
f"Job: {job_data['name']} id={job_data['id']} TimeoutError: {e}"
f"Job: {job_data['name']} id={job_data['id']} "
f"TimeoutError start retry"
)
else:
self.logger.error(f"Error while processing job {job_data['id']}: {e}")

self.logger.error(
f"Error while processing job {job_data['id']}: {e} start retry"
)
new_id = f"{uuid.uuid4()}_{int(time.time())}"
job_data["meta"]["retry_count"] += 1
new_job = Job(
queue_name=job_data["queue_name"],
name=job_data["name"],
data=job_data["data"],
meta=job_data["meta"],
)
job_bytes = json.dumps(new_job.to_dict()).encode()
job_data["id"] = new_id

job_bytes = json.dumps(job_data).encode()
await job.term()
await self.manager.publish(
job.subject,
job_bytes,
headers={"Nats-Msg-Id": f"{uuid.uuid4()}_{int(time.time())}"},
headers={"Nats-Msg-Id": new_id},
)
finally:
self.processing_now -= 1
Expand Down
2 changes: 2 additions & 0 deletions tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ def test_job_initialization():
queue_name="my_queue",
name="task_1",
data={"key": "value"},
meta={"parent_id": "1"},
)

assert job.queue_name == "my_queue"
assert job.name == "task_1"
assert job.data == {"key": "value"}
assert job.meta["retry_count"] == 0
assert job.meta["timeout"] is None
assert job.meta["parent_id"] == "1"


def test_job_initialization_with_delay():
Expand Down
37 changes: 37 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Dict, List, Union
import pytest
import pytest_asyncio
import json
Expand Down Expand Up @@ -243,3 +244,39 @@ async def test_create_queue_with_one_conf(get_client):

queue2 = Queue(client, name="my_queue", duplicate_window=1)
await queue2.setup()


@pytest.mark.asyncio
async def test_create_flow_job(get_client):

client = get_client
queue = Queue(client, name="my_queue")
await queue.setup()

flowJob: Dict[
str, Union[Job, List[Dict[str, Union[Job, List[Dict[str, Job]]]]]]
] = {
"job": Job("my_queue", "parent_job"),
"children": [
{
"job": Job("my_queue", "child_job_1"),
"children": [
{
"job": Job("my_queue", "child_job_1_1"),
},
{"job": Job("my_queue", "child_job_1_2")},
],
},
{"job": Job("my_queue", "child_job_2")},
],
}
parent_job_id = [flowJob["job"].id, flowJob["children"][0]["job"].id]

await queue.addFlowJob(flowJob)
key_value = await queue.manager.key_value(f"{queue.name}_parent_id")
kv_keys = await key_value.keys()
assert set(kv_keys) == set(parent_job_id)

stream_info = await queue.manager.stream_info(queue.name)
messages = stream_info.state.messages
assert messages == 3
Loading
Loading