-
Notifications
You must be signed in to change notification settings - Fork 12
Add MessageQueue feature #477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughVersion bumped 0.94.1 → 0.95.0 and dependency haiway updated to ~=0.40.0 in pyproject.toml; added an optional Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (2)
pyproject.toml(3 hunks)src/draive/rabbitmq/__init__.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Ensure latest, most strict typing syntax available from Python 3.13+; use no untyped public APIs and no looseAnyunless required by third-party boundaries
Prefer explicit attribute access with static types; avoid dynamicgetattrexcept at narrow boundaries
In public types, prefer abstract immutable protocols likeMapping,Sequence,Iterableover concretedict,list,set
Usefinalwhere applicable; avoid inheritance and prefer type composition
Use precise unions (|) and narrow withmatch/isinstance; avoidcastunless provably safe and localized
Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols likeHTTPRequestingto keep boundaries explicit
Guard immutability with assertions when crossing context boundaries; failure messages should aid debugging but never leak secrets
All I/O is async; keep boundaries async and usectx.spawnfor detached tasks, avoiding custom threading
Ensure structured concurrency concepts and valid coroutine usage; rely on haiway and asyncio packages
Await long-running operations directly; never block the event loop with sync calls
Translate provider/SDK errors into appropriate typed exceptions; wrap third-party exceptions at the boundary with actionable context (provider,operation, identifiers) while redacting sensitive payloads
Don't raise bareException; preserve contextual information in exception construction
Use observability hooks (ctx.log_*,ctx.record) fromctxhelper instead oflogging; tests assert on emitted events
Files:
src/draive/rabbitmq/__init__.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import symbols fromdraivedirectly (e.g.,from draive import State, ctx) rather than internal module paths
Use context scoping (ctx.scope(...)) to bind scopedDisposables, activeStateinstances, and avoid global state
Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Avoid docstrings for internal and private helpers; keep names self-explanatory
Skip module-level docstrings
Files:
src/draive/rabbitmq/__init__.py
🧠 Learnings (2)
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/__init__.py : Centralize public exports in `src/draive/__init__.py`; update this file when adding or modifying public APIs
Applied to files:
src/draive/rabbitmq/__init__.py
📚 Learning: 2025-06-16T10:28:07.434Z
Learnt from: KaQuMiQ
Repo: miquido/draive PR: 338
File: src/draive/lmm/__init__.py:1-2
Timestamp: 2025-06-16T10:28:07.434Z
Learning: The draive project requires Python 3.12+ as specified in pyproject.toml with "requires-python = ">=3.12"" and uses Python 3.12+ specific features like PEP 695 type aliases and generic syntax extensively throughout the codebase.
Applied to files:
pyproject.toml
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test (macos-latest, 3.13)
🔇 Additional comments (3)
pyproject.toml (3)
8-8: Version bump to0.95.0matches scope of added featureAligning the version with the new RabbitMQ integration and dependency updates makes sense; no issues from a packaging/semver perspective.
27-27: Confirm compatibility withhaiway~=0.40.0Bumping
haiwayto~0.40.0is appropriate for exposinghaiway.rabbitmq, but it’s worth double‑checking the 0.40.0 changelog for any breaking changes affecting existing integrations indraive.Please verify against the
haiway0.40.0 release notes and ensure the test suite passes with this version, especially around existinghaiway-based integrations (e.g., httpx, postgres).
55-55:rabbitmqextra group matches existing integration patterns; verifyhaiway[rabbitmq]extraDefining
rabbitmq = ["haiway[rabbitmq]", "pika"]is consistent with other extras (e.g.,haiway[httpx],haiway[postgres]) and cleanly gates MQ support behind an optional group.Please confirm that:
haiwayindeed exposes arabbitmqextra with the expected dependencies, andpikais the correct client lib and version range forhaiway.rabbitmqusage in this project.
If needed, consult thehaiwaydocumentation for the recommended extra and client library versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
♻️ Duplicate comments (1)
src/draive/rabbitmq/__init__.py (1)
1-7: Clean re-export pattern for optional RabbitMQ integration.The thin wrapper correctly re-exports the
haiway.rabbitmqsymbols with an explicit__all__tuple. This aligns well with the newrabbitmqoptional dependency group inpyproject.toml.The past review comment about wiring these exports through
src/draive/__init__.pyremains relevant if these symbols should be part of the top-level public API.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (7)
pyproject.toml(3 hunks)src/draive/aws/api.py(2 hunks)src/draive/aws/client.py(3 hunks)src/draive/aws/sqs.py(1 hunks)src/draive/aws/state.py(1 hunks)src/draive/aws/types.py(2 hunks)src/draive/rabbitmq/__init__.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Ensure latest, most strict typing syntax available from Python 3.13+; use no untyped public APIs and no looseAnyunless required by third-party boundaries
Prefer explicit attribute access with static types; avoid dynamicgetattrexcept at narrow boundaries
In public types, prefer abstract immutable protocols likeMapping,Sequence,Iterableover concretedict,list,set
Usefinalwhere applicable; avoid inheritance and prefer type composition
Use precise unions (|) and narrow withmatch/isinstance; avoidcastunless provably safe and localized
Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols likeHTTPRequestingto keep boundaries explicit
Guard immutability with assertions when crossing context boundaries; failure messages should aid debugging but never leak secrets
All I/O is async; keep boundaries async and usectx.spawnfor detached tasks, avoiding custom threading
Ensure structured concurrency concepts and valid coroutine usage; rely on haiway and asyncio packages
Await long-running operations directly; never block the event loop with sync calls
Translate provider/SDK errors into appropriate typed exceptions; wrap third-party exceptions at the boundary with actionable context (provider,operation, identifiers) while redacting sensitive payloads
Don't raise bareException; preserve contextual information in exception construction
Use observability hooks (ctx.log_*,ctx.record) fromctxhelper instead oflogging; tests assert on emitted events
Files:
src/draive/aws/api.pysrc/draive/aws/sqs.pysrc/draive/aws/state.pysrc/draive/aws/types.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/client.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import symbols fromdraivedirectly (e.g.,from draive import State, ctx) rather than internal module paths
Use context scoping (ctx.scope(...)) to bind scopedDisposables, activeStateinstances, and avoid global state
Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Avoid docstrings for internal and private helpers; keep names self-explanatory
Skip module-level docstrings
Files:
src/draive/aws/api.pysrc/draive/aws/sqs.pysrc/draive/aws/state.pysrc/draive/aws/types.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/client.py
🧠 Learnings (2)
📚 Learning: 2025-06-16T10:28:07.434Z
Learnt from: KaQuMiQ
Repo: miquido/draive PR: 338
File: src/draive/lmm/__init__.py:1-2
Timestamp: 2025-06-16T10:28:07.434Z
Learning: The draive project requires Python 3.12+ as specified in pyproject.toml with "requires-python = ">=3.12"" and uses Python 3.12+ specific features like PEP 695 type aliases and generic syntax extensively throughout the codebase.
Applied to files:
pyproject.toml
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/__init__.py : Centralize public exports in `src/draive/__init__.py`; update this file when adding or modifying public APIs
Applied to files:
src/draive/rabbitmq/__init__.py
🧬 Code graph analysis (3)
src/draive/aws/sqs.py (2)
src/draive/aws/api.py (1)
AWSAPI(9-74)src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
src/draive/aws/state.py (1)
src/draive/aws/types.py (1)
AWSSQSQueueAccessing(86-93)
src/draive/aws/client.py (3)
src/draive/aws/sqs.py (2)
AWSSQSMixin(28-140)_queue_access(29-62)src/draive/aws/state.py (1)
AWSSQS(12-86)src/draive/aws/api.py (1)
_prepare_sqs_client(65-68)
🔇 Additional comments (10)
src/draive/aws/types.py (1)
85-93: Protocol definition is well-structured.The
AWSSQSQueueAccessingprotocol correctly uses:
@runtime_checkablefor structural typing at boundaries- PEP 695 generic method syntax
[Content](appropriate for Python 3.13+)**extra: Anyat the protocol boundary (acceptable per guidelines)The async callable returning
AbstractAsyncContextManager[MQQueue[Content]]aligns with the implementation inAWSSQSMixin._queue_access.pyproject.toml (2)
55-55: RabbitMQ optional dependency group looks correct.The group includes both
haiway[rabbitmq]for the RabbitMQ integration andpikaas the underlying AMQP client. This follows the same pattern as other optional groups (e.g.,postgres,httpx).
27-27: Verify thathaiway0.40.0 exports MQQueue, MQMessage, and BasicObject.The version bump to
haiway~=0.40.0is necessary for MQQueue, MQMessage, and BasicObject exports used in the AWS SQS and RabbitMQ integrations. These symbols are imported directly from haiway insrc/draive/aws/sqs.py,src/draive/aws/types.py, andsrc/draive/aws/state.py, and RabbitMQ-specific exports are imported fromhaiway.rabbitmqinsrc/draive/rabbitmq/__init__.py. Confirm this version introduces these message queue abstractions.src/draive/aws/api.py (1)
64-68: SQS client preparer follows the established pattern.The
@asynchronousdecorator correctly offloads the blocking boto3 client creation to a thread pool, consistent with_prepare_s3_client.src/draive/aws/sqs.py (1)
96-103: Closure correctly capturesreceipt_handlevia default argument.The default argument pattern
receipt_handle: str = receipt_handleproperly binds the current iteration's value, avoiding the classic late-binding closure pitfall in loops.src/draive/aws/state.py (3)
1-9: LGTM!Imports are clean and follow the coding guideline to import from
draivedirectly where applicable. The__all__export is properly declared.
13-32: LGTM!The overloaded signatures correctly support both classmethod and instance method invocation patterns with proper generic type parameters.
79-86: LGTM!The implementation correctly delegates to the injected
queue_accessingcallable, and the attribute declaration follows the State pattern for required dependencies.src/draive/aws/client.py (2)
9-10: LGTM!The import additions and mixin inheritance are correctly structured. The MRO order places the mixins before the base AWSAPI class, which is the proper pattern for mixin composition.
Also applies to: 17-20
47-60: LGTM!The typing correctly uses
Collection(abstract) over concrete types, and the empty tuple default maintains immutability. The union type properly expresses the supported feature types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (6)
src/draive/rabbitmq/__init__.py (1)
1-7: Clean re-export shim for RabbitMQ integration.The pattern correctly re-exports
haiway.rabbitmqsymbols under thedraive.rabbitmqnamespace, consistent with other optional integration modules. The__all__tuple properly constrains the public surface.Note: The suggestion to wire these symbols through
src/draive/__init__.pywas already raised in a previous review.src/draive/aws/sqs.py (3)
25-62: Add a NumPy-style docstring toAWSSQSMixinand clarifyqueuesemantics.
AWSSQSMixinis exported via__all__and forms part of the public AWS surface, but it has no class-level docstring. Per project guidelines, it should have a NumPy-style docstring describing:
- Purpose: SQS-backed MQ queue access helpers built on
AWSAPI.- Parameters / expectations: that it is intended to be mixed into an
AWSAPIsubclass, and whatqueuerepresents (SQS queue URL vs logical name).- High-level behavior of
_queue_access/ the returnedMQQueue(publishusescontent_encoder,consumeusescontent_decoder, acknowledgements delete messages, rejects rely on retries).Also,
_queue_accessaccepts**extrabut currently ignores it entirely. Either thread these options through topublish_message/consume_messagesor drop**extrahere and from the protocol to avoid misleading callers.
117-128: Optional: Make long‑polling parameters configurable or document the fixed defaults.
MaxNumberOfMessages=1andWaitTimeSeconds=20are hardcoded. That’s reasonable as a default, but some consumers may need different batch sizes or wait times. Consider accepting these as optional parameters (threaded fromqueue(..., **extra)) or explicitly documenting the fixed behavior in the public AWSSQS docs.
143-154: Attribute formatting is correct; consider explicitboolhandling.
_format_attribute_valuecorrectly maps strings to"String"and numbers to"Number"per SQS semantics, and raises on unsupported types. Becauseboolis a subclass ofint, it will currently be treated as a"Number"withStringValue"True"/"False", which is odd for numeric attributes. If you expect boolean attributes, consider adding an explicitcase bool() as value:branch before the numeric case (or explicitly rejecting bools) to avoid surprising behavior.src/draive/aws/client.py (1)
62-83: Feature binding in__aenter__is correct; consider expanding the docstring.The new
__aenter__correctly:
- Lazily prepares the S3 client when
ResourcesRepositoryis requested.- Lazily prepares the SQS client when
AWSSQSis requested.- Returns a list of
Stateinstances for requested features.To better guide callers, the docstring could briefly explain that the returned
Iterable[State]is meant to be bound into a context (e.g., viactx.scope(features)) and that the list may containResourcesRepository,AWSSQS, or both, in an unspecified order.src/draive/aws/state.py (1)
43-78: Fixcontent_encoderdocstring type and alignqueuedescription with SQS usage.The
queuemethod signature usescontent_encoder: Callable[[Content], str], but the docstring still documents it asCallable[[Content], BasicObject], which is misleading.Apply the following adjustment (as previously suggested) so docs match the actual type:
- content_encoder : Callable[[Content], BasicObject] - Callable that transforms typed payloads into broker-serializable objects before publish. + content_encoder : Callable[[Content], str] + Callable that transforms typed payloads into strings before publish.Also, the parameter doc currently says:
queue : str— Name of the queue to access on the broker.Given that the underlying implementation in
AWSSQSMixinpassesqueuestraight intoQueueUrl=...for boto3, either:
- Clarify here that
queuemust be the full SQS QueueUrl, or- Change the impl to resolve a logical queue name into a URL before calling SQS.
Right now the example (
AWSSQS.queue("events", ...)) suggests a logical name, which will not work with the current boto3 usage.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
pyproject.toml(3 hunks)src/draive/aws/api.py(2 hunks)src/draive/aws/client.py(3 hunks)src/draive/aws/sqs.py(1 hunks)src/draive/aws/state.py(1 hunks)src/draive/aws/types.py(2 hunks)src/draive/rabbitmq/__init__.py(1 hunks)tests/test_tags_extraction.py(1 hunks)tests/test_tags_replacement.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Ensure latest, most strict typing syntax available from Python 3.13+; use no untyped public APIs and no looseAnyunless required by third-party boundaries
Prefer explicit attribute access with static types; avoid dynamicgetattrexcept at narrow boundaries
In public types, prefer abstract immutable protocols likeMapping,Sequence,Iterableover concretedict,list,set
Usefinalwhere applicable; avoid inheritance and prefer type composition
Use precise unions (|) and narrow withmatch/isinstance; avoidcastunless provably safe and localized
Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols likeHTTPRequestingto keep boundaries explicit
Guard immutability with assertions when crossing context boundaries; failure messages should aid debugging but never leak secrets
All I/O is async; keep boundaries async and usectx.spawnfor detached tasks, avoiding custom threading
Ensure structured concurrency concepts and valid coroutine usage; rely on haiway and asyncio packages
Await long-running operations directly; never block the event loop with sync calls
Translate provider/SDK errors into appropriate typed exceptions; wrap third-party exceptions at the boundary with actionable context (provider,operation, identifiers) while redacting sensitive payloads
Don't raise bareException; preserve contextual information in exception construction
Use observability hooks (ctx.log_*,ctx.record) fromctxhelper instead oflogging; tests assert on emitted events
Files:
src/draive/aws/api.pysrc/draive/aws/types.pysrc/draive/aws/state.pytests/test_tags_replacement.pysrc/draive/aws/sqs.pytests/test_tags_extraction.pysrc/draive/aws/client.pysrc/draive/rabbitmq/__init__.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import symbols fromdraivedirectly (e.g.,from draive import State, ctx) rather than internal module paths
Use context scoping (ctx.scope(...)) to bind scopedDisposables, activeStateinstances, and avoid global state
Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Avoid docstrings for internal and private helpers; keep names self-explanatory
Skip module-level docstrings
Files:
src/draive/aws/api.pysrc/draive/aws/types.pysrc/draive/aws/state.pysrc/draive/aws/sqs.pysrc/draive/aws/client.pysrc/draive/rabbitmq/__init__.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: No network in unit tests; mock providers and HTTP requests
Keep tests fast and specific to the code you change; start with unit tests around new types, functions, and adapters
Mirror package layout intests/; colocate new tests alongside features and preferpytestparametrization over loops
Test async flows withpytest.mark.asyncio; usectx.scopein tests to isolate state and avoid leaking globals
Files:
tests/test_tags_replacement.pytests/test_tags_extraction.py
🧠 Learnings (3)
📚 Learning: 2025-06-16T10:28:07.434Z
Learnt from: KaQuMiQ
Repo: miquido/draive PR: 338
File: src/draive/lmm/__init__.py:1-2
Timestamp: 2025-06-16T10:28:07.434Z
Learning: The draive project requires Python 3.12+ as specified in pyproject.toml with "requires-python = ">=3.12"" and uses Python 3.12+ specific features like PEP 695 type aliases and generic syntax extensively throughout the codebase.
Applied to files:
pyproject.toml
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/**/*.py : Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Applied to files:
src/draive/aws/sqs.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/__init__.py : Centralize public exports in `src/draive/__init__.py`; update this file when adding or modifying public APIs
Applied to files:
src/draive/rabbitmq/__init__.py
🧬 Code graph analysis (6)
src/draive/aws/types.py (1)
src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
src/draive/aws/state.py (1)
src/draive/aws/types.py (1)
AWSSQSQueueAccessing(86-93)
tests/test_tags_replacement.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
src/draive/aws/sqs.py (2)
src/draive/aws/api.py (1)
AWSAPI(9-74)src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
tests/test_tags_extraction.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
src/draive/aws/client.py (3)
src/draive/aws/sqs.py (2)
AWSSQSMixin(28-140)_queue_access(29-62)src/draive/aws/state.py (1)
AWSSQS(12-86)src/draive/aws/api.py (2)
_prepare_s3_client(59-62)_prepare_sqs_client(65-68)
🔇 Additional comments (10)
tests/test_tags_extraction.py (1)
465-476: LGTM!Minor string quote style normalization. The test logic remains correct and continues to verify that malformed self-closing tags (missing quotes around attribute values) are properly skipped.
tests/test_tags_replacement.py (1)
678-681: LGTM!Minor string quote style normalization consistent with the rest of the test suite.
pyproject.toml (3)
8-8: LGTM!Version bump to 0.95.0 is appropriate for the new MessageQueue feature additions (AWS SQS and RabbitMQ support).
55-55: LGTM!The new
rabbitmqoptional dependency group correctly bundleshaiway[rabbitmq](for the RabbitMQ integration) andpika(the underlying AMQP client library).
27-27: haiway 0.40.0 is available on PyPI.The version is published and stable. The dependency specification
haiway~=0.40.0correctly allows patch version updates within the 0.40.x series. No issues found.src/draive/aws/types.py (2)
1-5: LGTM!Imports are appropriate for the new protocol definition. Uses
Protocolandruntime_checkablefor structural typing at async boundaries, consistent with coding guidelines.
85-93: Well-defined protocol for SQS queue access.The
AWSSQSQueueAccessingprotocol correctly:
- Uses
@runtime_checkablefor async client boundaries- Employs PEP 695 generic syntax (
[Content]) on the method, appropriate for Python 3.13+- Returns
AbstractAsyncContextManager[MQQueue[Content]]for structured async resource management- Accepts
**extra: Anyfor forwarding provider-specific options (acceptable at third-party boundaries)The signature aligns with the usage in
src/draive/aws/state.pywherequeue_accessingdelegates to this callable.src/draive/aws/api.py (1)
16-20: SQS client wiring looks consistent and type-safe at the boundary.Adding
_sqs_clientto__slots__, annotating it asAny, and introducing_prepare_sqs_client()mirrors the existing S3 setup and keeps the boto3 interaction confined to a clearly typed boundary. No issues from a typing or async-usage perspective.Also applies to: 55-57, 58-68
src/draive/aws/sqs.py (1)
83-115: Consumption loop and message decoding behavior look correct.The consume loop respects cancellation via
ctx.check_cancellation(), uses long-polling via_receive, and yieldsMQMessageinstances with proper acknowledge callbacks. Passingmessage["Body"]intodecoderaligns with theCallable[[BasicObject], Content]contract and avoids the earlier “full envelope” issue. The no-oprejectrelying on SQS retry semantics is an acceptable default for now.src/draive/aws/client.py (1)
7-21: SQS feature integration intoAWSlooks structurally sound.Importing
AWSSQSMixin/AWSSQS, extendingAWSwith the mixin, and widening thefeaturestype toCollection[type[ResourcesRepository | AWSSQS]]align with the new SQS state. Using an empty tuple as the default feature set avoids implicit S3/SQS initialization and keeps typing strict. No functional or typing issues here.Also applies to: 42-61
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
♻️ Duplicate comments (4)
src/draive/rabbitmq/__init__.py (1)
1-7: Thin RabbitMQ shim looks good; ensure central top-level export stays in sync.The re-export and
__all__are clean and match other integration shims. If these are intended public symbols, also re-export them fromsrc/draive/__init__.py(e.g.from .rabbitmq import RabbitMQ, RabbitMQClient, RabbitMQException) so users can rely ondraive.RabbitMQ*as the single public entrypoint.src/draive/aws/client.py (1)
62-82: Document how callers should use the returned feature states from__aenter__.
__aenter__correctly prepares S3/SQS clients and returns a list ofStateinstances (e.g.ResourcesRepository,AWSSQS), but the docstring doesn’t explain how that iterable is meant to be used with context scoping (e.g.ctx.scope(features)).Consider expanding the docstring with a brief usage note:
- async def __aenter__(self) -> Iterable[State]: - """Prepare the AWS client and bind selected features to context.""" + async def __aenter__(self) -> Iterable[State]: + """ + Prepare the AWS client and bind selected features to context. + + Returns + ------- + Iterable[State] + Feature state instances (for example ``ResourcesRepository``, + ``AWSSQS``) ready to be passed into a context scope, e.g. + ``async with AWS(..., features={ResourcesRepository, AWSSQS}) as features:`` + `` async with ctx.scope(features): ...`` + """src/draive/aws/state.py (2)
12-12: Add a class‑level NumPy‑style docstring forAWSSQS.
AWSSQSis exported via__all__and is the primary public state wrapper for SQS queues, but the class itself has no docstring. Per the project guidelines, it should describe its role (binding SQS access into state), its main API (queueand injectedqueue_accessing), and high‑level usage.For example:
-class AWSSQS(State): +class AWSSQS(State): + """ + State wrapper providing typed access to AWS SQS queues. + + Parameters + ---------- + queue_accessing : AWSSQSQueueAccessing + Callable used internally to acquire queue‑scoped async context + managers. This is typically injected by the AWS client. + + Notes + ----- + The primary public API is :meth:`queue`, which returns an async context + manager yielding an :class:`MQQueue` bound to a specific SQS queue and + content encoder/decoder. + """Also applies to: 86-86
35-42: Fixcontent_encoderdocstring type and clarify queue/**extrasemantics.The implementation and overloads use
content_encoder: Callable[[Content], str], but the docstring still documentsCallable[[Content], BasicObject], which is misleading. Also, the doc saysqueueis the “Name of the queue”, and that**extrais forwarded to the underlying accessor, but in the current SQS mixin,queueis passed straight asQueueUrland**extrais effectively ignored.I’d recommend:
- Align the encoder docs with the actual type:
- content_encoder : Callable[[Content], BasicObject] - Callable that transforms typed payloads into broker-serializable objects before publish. + content_encoder : Callable[[Content], str] + Callable that transforms typed payloads into the message body string + sent to SQS.
- Clarify the meaning of
queueand behaviour of**extrato match the implementation you want:
- If
queueis intended to be a logical name (as in the"events"example), the SQS layer should resolve it viaget_queue_urlbefore callingsend_message/receive_message/delete_message.- If you prefer to treat
queueas the full SQS QueueUrl, update this docstring (and the example) to say so explicitly and mention that passing a bare name will not work.
- Either:
- Plumb
**extrathrough toAWSSQSMixin._queue_access/_publish/_receiveso callers can influence SQS options, or- Remove/undocument it here if you intentionally do not support extra options yet.
Right now the contract promises more than the implementation delivers, which will surprise callers.
Also applies to: 46-56
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
pyproject.toml(3 hunks)src/draive/aws/api.py(2 hunks)src/draive/aws/client.py(3 hunks)src/draive/aws/sqs.py(1 hunks)src/draive/aws/state.py(1 hunks)src/draive/aws/types.py(2 hunks)src/draive/rabbitmq/__init__.py(1 hunks)tests/test_tags_extraction.py(1 hunks)tests/test_tags_replacement.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Ensure latest, most strict typing syntax available from Python 3.13+; use no untyped public APIs and no looseAnyunless required by third-party boundaries
Prefer explicit attribute access with static types; avoid dynamicgetattrexcept at narrow boundaries
In public types, prefer abstract immutable protocols likeMapping,Sequence,Iterableover concretedict,list,set
Usefinalwhere applicable; avoid inheritance and prefer type composition
Use precise unions (|) and narrow withmatch/isinstance; avoidcastunless provably safe and localized
Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols likeHTTPRequestingto keep boundaries explicit
Guard immutability with assertions when crossing context boundaries; failure messages should aid debugging but never leak secrets
All I/O is async; keep boundaries async and usectx.spawnfor detached tasks, avoiding custom threading
Ensure structured concurrency concepts and valid coroutine usage; rely on haiway and asyncio packages
Await long-running operations directly; never block the event loop with sync calls
Translate provider/SDK errors into appropriate typed exceptions; wrap third-party exceptions at the boundary with actionable context (provider,operation, identifiers) while redacting sensitive payloads
Don't raise bareException; preserve contextual information in exception construction
Use observability hooks (ctx.log_*,ctx.record) fromctxhelper instead oflogging; tests assert on emitted events
Files:
tests/test_tags_replacement.pysrc/draive/aws/types.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/client.pysrc/draive/aws/state.pysrc/draive/aws/sqs.pysrc/draive/aws/api.pytests/test_tags_extraction.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: No network in unit tests; mock providers and HTTP requests
Keep tests fast and specific to the code you change; start with unit tests around new types, functions, and adapters
Mirror package layout intests/; colocate new tests alongside features and preferpytestparametrization over loops
Test async flows withpytest.mark.asyncio; usectx.scopein tests to isolate state and avoid leaking globals
Files:
tests/test_tags_replacement.pytests/test_tags_extraction.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import symbols fromdraivedirectly (e.g.,from draive import State, ctx) rather than internal module paths
Use context scoping (ctx.scope(...)) to bind scopedDisposables, activeStateinstances, and avoid global state
Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Avoid docstrings for internal and private helpers; keep names self-explanatory
Skip module-level docstrings
Files:
src/draive/aws/types.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/client.pysrc/draive/aws/state.pysrc/draive/aws/sqs.pysrc/draive/aws/api.py
🧠 Learnings (3)
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/__init__.py : Centralize public exports in `src/draive/__init__.py`; update this file when adding or modifying public APIs
Applied to files:
src/draive/rabbitmq/__init__.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/**/*.py : Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Applied to files:
src/draive/aws/state.pysrc/draive/aws/sqs.py
📚 Learning: 2025-06-16T10:28:07.434Z
Learnt from: KaQuMiQ
Repo: miquido/draive PR: 338
File: src/draive/lmm/__init__.py:1-2
Timestamp: 2025-06-16T10:28:07.434Z
Learning: The draive project requires Python 3.12+ as specified in pyproject.toml with "requires-python = ">=3.12"" and uses Python 3.12+ specific features like PEP 695 type aliases and generic syntax extensively throughout the codebase.
Applied to files:
pyproject.toml
🧬 Code graph analysis (6)
tests/test_tags_replacement.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
src/draive/aws/types.py (1)
src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
src/draive/aws/client.py (3)
src/draive/aws/sqs.py (2)
AWSSQSMixin(28-140)_queue_access(29-62)src/draive/aws/state.py (1)
AWSSQS(12-86)src/draive/aws/api.py (2)
_prepare_s3_client(59-62)_prepare_sqs_client(65-68)
src/draive/aws/state.py (1)
src/draive/aws/types.py (1)
AWSSQSQueueAccessing(86-93)
src/draive/aws/sqs.py (2)
src/draive/aws/api.py (1)
AWSAPI(9-74)src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
tests/test_tags_extraction.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
🔇 Additional comments (3)
tests/test_tags_extraction.py (1)
466-476: LGTM!String literal delimiter standardization from single to double quotes. No functional changes.
tests/test_tags_replacement.py (1)
679-681: LGTM!String literal delimiter standardization consistent with changes in
test_tags_extraction.py. No functional impact.pyproject.toml (1)
8-8: Version/dependency bump andrabbitmqextra wiring are consistent; no action required.The move to
haiway~=0.40.0and therabbitmq = ["haiway[rabbitmq]", "pika"]extra are correctly wired. The symbolsMQQueue,MQMessage,BasicObject, andasynchronousare actively used throughout the codebase (e.g., insrc/draive/aws/sqs.py,src/draive/aws/state.py), and RabbitMQ imports are present insrc/draive/rabbitmq/__init__.py, confirming the dependency is compatible with the new AWS/RabbitMQ features.
3f200d4 to
1bf16bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/draive/aws/s3.py (1)
229-245: Centralized S3 metadata sanitization is solid; consider tighteningvaluetypingMoving metadata normalization into
_sanitize_metadata_value/_sanitize_metadataand wiring_uploadthrough them makes the S3 boundary much clearer and safer (ASCII-only, whitespace/control stripping, byte-length guard).Given the current usages, you could narrow
def _sanitize_metadata_value(value: Any) -> str:to something like
def _sanitize_metadata_value(value: str | BasicValue) -> str:to keep
Anyreserved for true third‑party boundaries while preserving behavior.Also applies to: 255-299
♻️ Duplicate comments (6)
src/draive/rabbitmq/__init__.py (1)
1-7: Thin RabbitMQ re-export looks good; consider wiring throughdraive.__init__as wellThe direct re-export of
RabbitMQ,RabbitMQClient, andRabbitMQExceptionfromhaiway.rabbitmqwith an explicit__all__matches the existing integration-shim style and is clear.If these are intended as part of the public surface, consider also re-exporting them from
src/draive/__init__.py(e.g.,from .rabbitmq import RabbitMQ, RabbitMQClient, RabbitMQException) so users can rely ondraive.RabbitMQ*alongside the other top-level integrations.src/draive/aws/types.py (1)
1-6: PublicAWSSQSQueueAccessingprotocol looks correct; add a NumPy-style docstringThe protocol shape (
__call__async, returning anAbstractAsyncContextManager[MQQueue[Content]]with encoder/decoder callables) matches howAWSSQS.queueand the SQS mixin use it, so the typing boundary looks good.Since
AWSSQSQueueAccessingis exported via__all__and is a public boundary, it should carry a short NumPy-style docstring documenting thequeue,content_encoder, andcontent_decoderparameters and the returned async context manager (anMQQueue[Content]accessor), similar in spirit to theAWSSQS.queuedocstring.Adding that docstring will keep the public API self-describing and consistent with other exported protocols.
Also applies to: 11-12, 85-93
src/draive/aws/state.py (1)
12-86: AlignAWSSQSdocumentation with its types (class docstring +content_encodertype)The
AWSSQSstate wrapper and its genericqueuestatemethod are well-typed and delegate cleanly toqueue_accessing, but there are two documentation gaps:
Missing class-level docstring for
AWSSQSSince
AWSSQSis exported via__all__and is a public state surface, add a short NumPy-style class docstring describing it as a stateful accessor for AWS SQS queues, noting that:
- The main entrypoint is the
queuestatemethod.queue_accessing: AWSSQSQueueAccessingis injected by the AWS client and performs the actual access/teardown.- Typical usage mirrors the example already shown in the
queuemethod docstring.Docstring type mismatch for
content_encoderThe
queuedocstring currently documents:content_encoder : Callable[[Content], BasicObject] Callable that transforms typed payloads into broker-serializable objects before publish.but the actual parameter type is
Callable[[Content], str]. To avoid confusion, update the docstring to match the signature, e.g.:
content_encoder : Callable[[Content], BasicObject]Callable that transforms typed payloads into broker-serializable objects before publish.
content_encoder : Callable[[Content], str]Callable that transforms typed payloads into strings before publish.These tweaks bring the public docs back in sync with the types and the underlying
AWSSQSQueueAccessingprotocol.src/draive/aws/sqs.py (3)
30-64: Missing docstring and**extraparameters not forwarded.Per past review comments that remain unaddressed:
AWSSQSMixinlacks the required NumPy-style docstring, and**extraparameters are accepted but silently dropped throughout the call chain (publish_message,consume_messages,_publish). This violates the API contract documented inAWSSQS.queue.
216-227:boolvalues incorrectly handled as numbers.As noted in a prior review,
boolis a subclass ofintin Python, soTrue/Falsematch theint() | float()case and produce{"DataType": "Number", "StringValue": "True"}. Add an explicitcase bool()before the numeric case.
98-131: Type mismatch: decoder expectsBasicObjectbut receivesstr.Line 126 passes
message["Body"](astr) todecoder, but the type annotation on line 103 declaresCallable[[BasicObject], Content]. This inconsistency will cause type checkers to report errors and confuses consumers about the expected decoder signature.Either parse the JSON body before passing to the decoder:
+from json import loads ... yield MQMessage( - content=decoder(message["Body"]), + content=decoder(loads(message["Body"])),Or update the type annotations across
AWSSQSQueueAccessing,AWSSQS.queue, and_queue_accessto useCallable[[str], Content]if consumers are expected to handle raw string bodies.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
pyproject.toml(3 hunks)src/draive/aws/api.py(2 hunks)src/draive/aws/client.py(4 hunks)src/draive/aws/s3.py(1 hunks)src/draive/aws/sqs.py(1 hunks)src/draive/aws/state.py(1 hunks)src/draive/aws/types.py(2 hunks)src/draive/rabbitmq/__init__.py(1 hunks)tests/test_tags_extraction.py(1 hunks)tests/test_tags_replacement.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Ensure latest, most strict typing syntax available from Python 3.13+; use no untyped public APIs and no looseAnyunless required by third-party boundaries
Prefer explicit attribute access with static types; avoid dynamicgetattrexcept at narrow boundaries
In public types, prefer abstract immutable protocols likeMapping,Sequence,Iterableover concretedict,list,set
Usefinalwhere applicable; avoid inheritance and prefer type composition
Use precise unions (|) and narrow withmatch/isinstance; avoidcastunless provably safe and localized
Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols likeHTTPRequestingto keep boundaries explicit
Guard immutability with assertions when crossing context boundaries; failure messages should aid debugging but never leak secrets
All I/O is async; keep boundaries async and usectx.spawnfor detached tasks, avoiding custom threading
Ensure structured concurrency concepts and valid coroutine usage; rely on haiway and asyncio packages
Await long-running operations directly; never block the event loop with sync calls
Translate provider/SDK errors into appropriate typed exceptions; wrap third-party exceptions at the boundary with actionable context (provider,operation, identifiers) while redacting sensitive payloads
Don't raise bareException; preserve contextual information in exception construction
Use observability hooks (ctx.log_*,ctx.record) fromctxhelper instead oflogging; tests assert on emitted events
Files:
tests/test_tags_replacement.pysrc/draive/aws/state.pysrc/draive/aws/types.pysrc/draive/aws/s3.pysrc/draive/rabbitmq/__init__.pytests/test_tags_extraction.pysrc/draive/aws/sqs.pysrc/draive/aws/client.pysrc/draive/aws/api.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: No network in unit tests; mock providers and HTTP requests
Keep tests fast and specific to the code you change; start with unit tests around new types, functions, and adapters
Mirror package layout intests/; colocate new tests alongside features and preferpytestparametrization over loops
Test async flows withpytest.mark.asyncio; usectx.scopein tests to isolate state and avoid leaking globals
Files:
tests/test_tags_replacement.pytests/test_tags_extraction.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import symbols fromdraivedirectly (e.g.,from draive import State, ctx) rather than internal module paths
Use context scoping (ctx.scope(...)) to bind scopedDisposables, activeStateinstances, and avoid global state
Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Avoid docstrings for internal and private helpers; keep names self-explanatory
Skip module-level docstrings
Files:
src/draive/aws/state.pysrc/draive/aws/types.pysrc/draive/aws/s3.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/sqs.pysrc/draive/aws/client.pysrc/draive/aws/api.py
🧠 Learnings (4)
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/**/*.py : Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Applied to files:
src/draive/aws/state.pysrc/draive/aws/sqs.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to **/*.py : Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols like `HTTPRequesting` to keep boundaries explicit
Applied to files:
src/draive/aws/types.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/__init__.py : Centralize public exports in `src/draive/__init__.py`; update this file when adding or modifying public APIs
Applied to files:
src/draive/rabbitmq/__init__.py
📚 Learning: 2025-06-16T10:28:07.434Z
Learnt from: KaQuMiQ
Repo: miquido/draive PR: 338
File: src/draive/lmm/__init__.py:1-2
Timestamp: 2025-06-16T10:28:07.434Z
Learning: The draive project requires Python 3.12+ as specified in pyproject.toml with "requires-python = ">=3.12"" and uses Python 3.12+ specific features like PEP 695 type aliases and generic syntax extensively throughout the codebase.
Applied to files:
pyproject.toml
🧬 Code graph analysis (5)
tests/test_tags_replacement.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
src/draive/aws/state.py (1)
src/draive/aws/types.py (1)
AWSSQSQueueAccessing(86-93)
src/draive/aws/types.py (1)
src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
src/draive/aws/sqs.py (3)
src/draive/aws/api.py (1)
AWSAPI(9-74)src/draive/aws/types.py (3)
AWSAccessDenied(51-65)AWSError(15-48)AWSResourceNotFound(68-82)src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
src/draive/aws/client.py (3)
src/draive/aws/sqs.py (2)
AWSSQSMixin(30-213)_queue_access(31-64)src/draive/aws/state.py (1)
AWSSQS(12-86)src/draive/aws/api.py (1)
_prepare_sqs_client(65-68)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: test (macos-latest, 3.13)
🔇 Additional comments (13)
tests/test_tags_extraction.py (1)
465-477: String-literal style change onlyThe updated HTML snippets are a pure quoting/style tweak and do not affect test behavior; this segment looks good as-is.
tests/test_tags_replacement.py (1)
678-682: No behavioral change in malformed self-closing tag testThe updated string literal keeps the test scenario identical (malformed
imgtag is still ignored); no further changes needed here.src/draive/aws/api.py (1)
12-20: SQS client wiring mirrors S3 cleanly; no action neededThe added
_sqs_clientslot and_prepare_sqs_clientfollow the existing S3 pattern correctly and keep boto3 behind@asynchronous. All call sites inclient.pyproperly await both methods. The rename from_prepare_clientto_prepare_s3_clientis complete with no stale references remaining.Likely an incorrect or invalid review comment.
src/draive/aws/sqs.py (6)
1-27: Imports and module setup look correct.The imports are appropriate for the functionality:
asynccontextmanagerfor async context management,ClientErrorfor boto3 error handling, and haiway types for message queue abstractions. The__all__export is properly restricted to the public mixin.
66-96:_publishimplementation is well-structured.Queue URL resolution, error translation, and message attribute formatting follow established patterns. The
@asynchronousdecorator ensures boto3's synchronous calls don't block the event loop.
132-156:_receivecorrectly implements long-polling with error translation.The implementation properly uses long-polling (
WaitTimeSeconds=20) and handles boto3 errors. The hardcoded polling parameters were noted in a prior review as a potential enhancement.
158-181:_ack_messageimplementation is correct.Properly resolves queue URL and translates errors consistently with other operations.
230-287: Error translation logic is comprehensive and well-structured.The mapping covers common SQS error codes (
NonExistentQueue, access denied variants) and HTTP status codes. The contextual error messages include provider, operation, and queue identifiers for debugging.
290-297: LGTM!Clean helper for consistent error message formatting with contextual metadata.
src/draive/aws/client.py (4)
9-10: LGTM!Imports for
AWSSQSMixinandAWSSQSare correctly added to support the new SQS feature.
16-21: Mixin composition is correctly ordered.The MRO
AWS -> AWSS3Mixin -> AWSSQSMixin -> AWSAPIensures both mixins share a singleAWSAPIinstance. PlacingAWSAPIlast is the canonical pattern for diamond inheritance with mixins.
56-61: Breaking change: defaultfeaturesis now empty.Previously, omitting
featuresdefaulted to(ResourcesRepository,). Now it defaults to(), so existing code likeasync with AWS() as featureswill get an empty list andResourcesRepositorywon't be activated without explicit request.If this is intentional (explicit-is-better), consider documenting the change in release notes. Otherwise, restore the previous default:
else: - self._features = () + self._features = (ResourcesRepository,)
63-83: Feature binding implementation is correct.The conditional client preparation and state instantiation properly handle multiple feature types. The returned
list[State]matches theIterable[State]return type and integrates well withctx.scope(...).
b19c55a to
d345469
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (7)
pyproject.toml (1)
8-8: Version bump and dependency updates are consistent with the new MessageQueue feature.The version bump to
0.95.0, thehaiway~=0.40.0update, and the newrabbitmqoptional dependency group align with the PR's objective to add MessageQueue support. Thepikadependency lacking a version constraint was already noted in a previous review.Also applies to: 27-27, 55-55
src/draive/rabbitmq/__init__.py (1)
1-7: Clean re-export shim for RabbitMQ integration.The direct re-export from
haiway.rabbitmqwith explicit__all__follows the established pattern for integration modules. The suggestion to also export these symbols fromsrc/draive/__init__.pyfor central API consistency was already raised in a previous review.src/draive/aws/types.py (1)
85-93: Well-defined protocol with precise typing; docstring suggestion already noted.The
AWSSQSQueueAccessingprotocol correctly uses structural typing with@runtime_checkable, and the__call__signature aligns with theAWSSQS.queueimplementation insrc/draive/aws/state.py. The PEP 695 generic syntax (def __call__[Content]) is appropriate for Python 3.13+.The suggestion to add a NumPy-style docstring for this public protocol was already raised in a previous review.
src/draive/aws/state.py (2)
12-13: Add a class-level NumPy-style docstring forAWSSQS.Per coding guidelines, public symbols require NumPy-style docstrings.
AWSSQSis exported via__all__but lacks class-level documentation describing its purpose, main public API (queuestatemethod andqueue_accessingattribute), and usage pattern.
50-51: Docstring type mismatch forcontent_encoder.The docstring states
Callable[[Content], BasicObject]but the actual parameter type isCallable[[Content], str]. This inconsistency could mislead API consumers.- content_encoder : Callable[[Content], BasicObject] - Callable that transforms typed payloads into broker-serializable objects before publish. + content_encoder : Callable[[Content], str] + Callable that transforms typed payloads into strings before publish.src/draive/aws/sqs.py (2)
31-40:AWSSQSMixinis missing a NumPy-style docstring and silently drops**extraparameters.Per coding guidelines, public symbols require NumPy-style docstrings. Additionally,
_queue_accessreceives**extraat line 39 but never forwards it to_publish(line 48-52) or throughconsume_messages(line 54-60). TheAWSSQS.queuedocstring promises these are "forwarded to the underlying queue accessor."Either forward
**extrathrough the stack or update the API contract to clarify that extra options are not supported for SQS.
226-237:boolvalues are incorrectly serialized as SQS Number type.Since
boolis a subclass ofintin Python,True/Falsematch theint() | float()case (line 233) and are serialized as{"DataType": "Number", "StringValue": "True"}. SQS expects numeric strings for Number types, so this will likely cause validation errors or unexpected behavior.Add an explicit
boolcase before the numeric case:def _format_attribute_value( value: RawValue, ) -> dict[str, Any]: match value: case str() as value: return {"DataType": "String", "StringValue": value} + case bool() as value: + return {"DataType": "String", "StringValue": "true" if value else "false"} + case int() | float() as value: return {"DataType": "Number", "StringValue": str(value)} case value: - raise ValueError(f"Unsupported value {type(value)}") + raise ValueError(f"Unsupported SQS attribute value type: {type(value)}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
pyproject.toml(3 hunks)src/draive/aws/api.py(2 hunks)src/draive/aws/client.py(4 hunks)src/draive/aws/s3.py(1 hunks)src/draive/aws/sqs.py(1 hunks)src/draive/aws/state.py(1 hunks)src/draive/aws/types.py(2 hunks)src/draive/rabbitmq/__init__.py(1 hunks)tests/test_tags_extraction.py(1 hunks)tests/test_tags_replacement.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Ensure latest, most strict typing syntax available from Python 3.13+; use no untyped public APIs and no looseAnyunless required by third-party boundaries
Prefer explicit attribute access with static types; avoid dynamicgetattrexcept at narrow boundaries
In public types, prefer abstract immutable protocols likeMapping,Sequence,Iterableover concretedict,list,set
Usefinalwhere applicable; avoid inheritance and prefer type composition
Use precise unions (|) and narrow withmatch/isinstance; avoidcastunless provably safe and localized
Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols likeHTTPRequestingto keep boundaries explicit
Guard immutability with assertions when crossing context boundaries; failure messages should aid debugging but never leak secrets
All I/O is async; keep boundaries async and usectx.spawnfor detached tasks, avoiding custom threading
Ensure structured concurrency concepts and valid coroutine usage; rely on haiway and asyncio packages
Await long-running operations directly; never block the event loop with sync calls
Translate provider/SDK errors into appropriate typed exceptions; wrap third-party exceptions at the boundary with actionable context (provider,operation, identifiers) while redacting sensitive payloads
Don't raise bareException; preserve contextual information in exception construction
Use observability hooks (ctx.log_*,ctx.record) fromctxhelper instead oflogging; tests assert on emitted events
Files:
tests/test_tags_replacement.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/types.pysrc/draive/aws/sqs.pysrc/draive/aws/api.pysrc/draive/aws/state.pysrc/draive/aws/s3.pytests/test_tags_extraction.pysrc/draive/aws/client.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: No network in unit tests; mock providers and HTTP requests
Keep tests fast and specific to the code you change; start with unit tests around new types, functions, and adapters
Mirror package layout intests/; colocate new tests alongside features and preferpytestparametrization over loops
Test async flows withpytest.mark.asyncio; usectx.scopein tests to isolate state and avoid leaking globals
Files:
tests/test_tags_replacement.pytests/test_tags_extraction.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import symbols fromdraivedirectly (e.g.,from draive import State, ctx) rather than internal module paths
Use context scoping (ctx.scope(...)) to bind scopedDisposables, activeStateinstances, and avoid global state
Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Avoid docstrings for internal and private helpers; keep names self-explanatory
Skip module-level docstrings
Files:
src/draive/rabbitmq/__init__.pysrc/draive/aws/types.pysrc/draive/aws/sqs.pysrc/draive/aws/api.pysrc/draive/aws/state.pysrc/draive/aws/s3.pysrc/draive/aws/client.py
🧠 Learnings (4)
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/__init__.py : Centralize public exports in `src/draive/__init__.py`; update this file when adding or modifying public APIs
Applied to files:
src/draive/rabbitmq/__init__.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to **/*.py : Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols like `HTTPRequesting` to keep boundaries explicit
Applied to files:
src/draive/aws/types.py
📚 Learning: 2025-06-16T10:28:07.434Z
Learnt from: KaQuMiQ
Repo: miquido/draive PR: 338
File: src/draive/lmm/__init__.py:1-2
Timestamp: 2025-06-16T10:28:07.434Z
Learning: The draive project requires Python 3.12+ as specified in pyproject.toml with "requires-python = ">=3.12"" and uses Python 3.12+ specific features like PEP 695 type aliases and generic syntax extensively throughout the codebase.
Applied to files:
pyproject.toml
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/**/*.py : Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Applied to files:
src/draive/aws/sqs.pysrc/draive/aws/state.py
🧬 Code graph analysis (4)
tests/test_tags_replacement.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
src/draive/aws/types.py (1)
src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
src/draive/aws/sqs.py (3)
src/draive/aws/api.py (1)
AWSAPI(9-74)src/draive/aws/types.py (3)
AWSAccessDenied(51-65)AWSError(15-48)AWSResourceNotFound(68-82)src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
src/draive/aws/state.py (1)
src/draive/aws/types.py (1)
AWSSQSQueueAccessing(86-93)
🔇 Additional comments (10)
tests/test_tags_extraction.py (1)
466-476: Cosmetic quote style change; test logic remains correct.The change from single to double quotes for the test input strings is a formatting adjustment with no semantic impact. The test correctly validates that self-closing tags with malformed attributes (missing quotes around attribute values) are ignored while valid tags are still parsed.
tests/test_tags_replacement.py (1)
678-681: Cosmetic quote style change; test correctly validates malformed tag handling.The quote style adjustment has no semantic impact. The test appropriately verifies that
replacing_tagleaves content unchanged when encountering a malformed self-closing tag with an unquoted attribute value.src/draive/aws/types.py (1)
1-5: Imports are appropriate for the new protocol definition.All imports are used:
Callable,AbstractAsyncContextManager,Protocol,runtime_checkablefrom the standard library, andBasicObject,MQQueuefromhaiwayfor the protocol's type signature.src/draive/aws/s3.py (1)
281-295: LGTM!The metadata sanitization correctly handles both keys and values, defensively skips empty entries, and returns the proper
dict[str, str]type for S3 Metadata.src/draive/aws/api.py (1)
64-68: LGTM!The
_prepare_sqs_clientmethod correctly mirrors the_prepare_s3_clientpattern, maintaining consistency in the boto3 client initialization approach.src/draive/aws/state.py (1)
79-84: LGTM!The
queuemethod correctly delegates toqueue_accessingwith all parameters including**extra, matching the documented behavior.src/draive/aws/sqs.py (2)
101-133: LGTM!The consume implementation correctly:
- Passes
message["Body"]to the decoder (not the full envelope)- Uses default argument capture for
receipt_handlein the closure- Checks cancellation between polling iterations
240-297: LGTM!The error translation follows the established S3 pattern, correctly maps SQS-specific error codes to typed exceptions, and includes contextual information (provider, operation, queue) for debugging.
src/draive/aws/client.py (2)
56-61: Defaultfeatureschanged from(ResourcesRepository,)to()—potential breaking change.Previously, if
featureswasNone,ResourcesRepositorywas activated by default. Now it defaults to an empty tuple, meaning no features are activated unless explicitly requested. This could break existing code that relies on the implicit default.If this is intentional, consider documenting the change. If backward compatibility is desired:
else: - self._features = () + self._features = (ResourcesRepository,)
63-83: LGTM!The
__aenter__implementation correctly:
- Lazily prepares only the requested AWS service clients
- Constructs feature states with appropriate callbacks
- Returns an
Iterable[State]for context binding
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/draive/aws/client.py (1)
22-39: Update class docstring to mention SQS capability.The class now supports SQS features via
AWSSQSMixin, but the docstring at line 22 only mentions "S3 and repository integrations." Update to reflect the broader capability:- """AWS service facade bundling S3 and repository integrations. + """AWS service facade bundling S3, SQS, and repository integrations.
♻️ Duplicate comments (7)
pyproject.toml (1)
8-8: Version and MQ dependency updates look aligned; consider constrainingpikaThe 0.95.0 version bump and
haiway~=0.40.0upgrade match the new MQ/SQS/RabbitMQ features and Python 3.13 tooling.For the new
rabbitmqextra, you currently install the latestpikawithout bounds. Unless you explicitly want “latest available” behavior, consider adding a compatible version range (e.g.,pika~=X.Y) to avoid unexpected upstream breaking changes, and verify thathaiway~=0.40.0plus your chosenpikarange passes the test suite.Also applies to: 27-27, 55-55
src/draive/rabbitmq/__init__.py (1)
1-7: RabbitMQ shim looks good; consider adding top‑leveldraivere‑exportsThis is a clean, minimal re‑export of the haiway RabbitMQ types. If these are part of the intended public surface, consider also re‑exporting them from
src/draive/__init__.pyso users can accessdraive.RabbitMQ,draive.RabbitMQClient, anddraive.RabbitMQException, keeping the central public API index consistent. Based on learnings, public exports are typically centralized there.src/draive/aws/types.py (1)
1-3: Add a NumPy‑style docstring for publicAWSSQSQueueAccessingThe protocol shape and generics look good and line up with the
AWSSQS.queueoverloads, but sinceAWSSQSQueueAccessingis exported via__all__, it should carry a short NumPy‑style docstring describing the async__call__contract (Parameters/Returns, noting it returns anAbstractAsyncContextManager[MQQueue[Content]]for a given queue/encoder/decoder), to keep the public boundary self‑describing. As per coding guidelines forsrc/draive/**/*.py, public symbols should have NumPy‑style docstrings.Also applies to: 5-5, 11-12, 85-93
src/draive/aws/api.py (1)
12-20: SQS client wiring is fine; fix minor grammar in the class docstringThe added
_sqs_clientslot and_prepare_sqs_clientmirror the existing S3 path and keep boto3 calls behind@asynchronous, which is consistent with the rest of the AWS mixins.The class docstring sentence
“Provides an asynchronous S3 and SQS client initializers…”
is grammatically off. Consider e.g.:
- “Provides asynchronous S3 and SQS client initializers that other mixins can rely on …”, or
- “Provides an asynchronous initializer for S3 and SQS clients that other mixins can rely on …”.
Also applies to: 56-68
src/draive/aws/state.py (1)
12-86: Add class‑level docs forAWSSQSand fixcontent_encoderdocstring typeThe
AWSSQS.queueoverloads and implementation look consistent withAWSSQSQueueAccessingand the@statemethodpattern, but there are two doc issues:
AWSSQSis exported via__all__yet has no class‑level NumPy‑style docstring, contrary to thesrc/draive/**/*.pyguidelines for public symbols.- In the
queuemethod docstring,content_encoderis documented asCallable[[Content], BasicObject]while the actual type isCallable[[Content], str], which can confuse consumers.You can address both with something like:
class AWSSQS(State): + """ + AWS SQS state providing typed queue access bound to the current state. + + Parameters + ---------- + queue_accessing : AWSSQSQueueAccessing + Callable responsible for creating typed SQS queue accessors. It is + typically injected by the AWS client layer and used internally by + :meth:`queue`. + + Notes + ----- + The primary entry point is :meth:`queue`, which returns an async context + manager yielding an :class:`MQQueue` configured with the provided + encoder/decoder pair. + """ @@ - content_encoder : Callable[[Content], BasicObject] - Callable that transforms typed payloads into broker-serializable objects before publish. + content_encoder : Callable[[Content], str] + Callable that transforms typed payloads into strings before publish.As per coding guidelines and prior feedback, this keeps the public state type self‑describing and aligns the documentation with the actual signature.
src/draive/aws/sqs.py (2)
46-79:**extraparameters are accepted but never forwarded, violating the API contract.
AWSSQS.queue(state.py:43-49) documents that**extraare "Additional options forwarded to the underlying queue accessor," but:
publish_message(line 58) accepts**extrabut doesn't pass it to_publish(line 60)consume_messages(line 67) accepts**extrabut doesn't pass it to_consume(line 69)Either thread
**extrathrough the full call chain (publish_message→_publish,consume_messages→_consume) or remove**extrafrom all signatures and updateAWSSQS.queue's docstring to clarify extra options are unsupported.
238-249:boolvalues incorrectly serialize as Number type, causing SQS API errors.Since
boolis a subclass ofintin Python, boolean values match theint() | float()case at line 245, resulting in attributes like{"DataType": "Number", "StringValue": "True"}. AWS SQS rejects numeric attributes with non-numeric strings.Add an explicit
boolcase before the numeric pattern:def _format_attribute_value( value: RawValue, ) -> dict[str, Any]: match value: case str() as value: return {"DataType": "String", "StringValue": value} + case bool() as value: + return {"DataType": "String", "StringValue": "true" if value else "false"} + case int() | float() as value: return {"DataType": "Number", "StringValue": str(value)} case value: - raise ValueError(f"Unsupported value {type(value)}") + raise ValueError(f"Unsupported SQS attribute value type: {type(value)}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
pyproject.toml(3 hunks)src/draive/aws/api.py(2 hunks)src/draive/aws/client.py(4 hunks)src/draive/aws/s3.py(1 hunks)src/draive/aws/sqs.py(1 hunks)src/draive/aws/state.py(1 hunks)src/draive/aws/types.py(2 hunks)src/draive/rabbitmq/__init__.py(1 hunks)tests/test_tags_extraction.py(1 hunks)tests/test_tags_replacement.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Ensure latest, most strict typing syntax available from Python 3.13+; use no untyped public APIs and no looseAnyunless required by third-party boundaries
Prefer explicit attribute access with static types; avoid dynamicgetattrexcept at narrow boundaries
In public types, prefer abstract immutable protocols likeMapping,Sequence,Iterableover concretedict,list,set
Usefinalwhere applicable; avoid inheritance and prefer type composition
Use precise unions (|) and narrow withmatch/isinstance; avoidcastunless provably safe and localized
Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols likeHTTPRequestingto keep boundaries explicit
Guard immutability with assertions when crossing context boundaries; failure messages should aid debugging but never leak secrets
All I/O is async; keep boundaries async and usectx.spawnfor detached tasks, avoiding custom threading
Ensure structured concurrency concepts and valid coroutine usage; rely on haiway and asyncio packages
Await long-running operations directly; never block the event loop with sync calls
Translate provider/SDK errors into appropriate typed exceptions; wrap third-party exceptions at the boundary with actionable context (provider,operation, identifiers) while redacting sensitive payloads
Don't raise bareException; preserve contextual information in exception construction
Use observability hooks (ctx.log_*,ctx.record) fromctxhelper instead oflogging; tests assert on emitted events
Files:
src/draive/aws/api.pysrc/draive/aws/sqs.pysrc/draive/aws/state.pysrc/draive/aws/types.pytests/test_tags_replacement.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/s3.pytests/test_tags_extraction.pysrc/draive/aws/client.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import symbols fromdraivedirectly (e.g.,from draive import State, ctx) rather than internal module paths
Use context scoping (ctx.scope(...)) to bind scopedDisposables, activeStateinstances, and avoid global state
Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Avoid docstrings for internal and private helpers; keep names self-explanatory
Skip module-level docstrings
Files:
src/draive/aws/api.pysrc/draive/aws/sqs.pysrc/draive/aws/state.pysrc/draive/aws/types.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/s3.pysrc/draive/aws/client.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: No network in unit tests; mock providers and HTTP requests
Keep tests fast and specific to the code you change; start with unit tests around new types, functions, and adapters
Mirror package layout intests/; colocate new tests alongside features and preferpytestparametrization over loops
Test async flows withpytest.mark.asyncio; usectx.scopein tests to isolate state and avoid leaking globals
Files:
tests/test_tags_replacement.pytests/test_tags_extraction.py
🧠 Learnings (4)
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/**/*.py : Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Applied to files:
src/draive/aws/sqs.pysrc/draive/aws/state.py
📚 Learning: 2025-06-16T10:28:07.434Z
Learnt from: KaQuMiQ
Repo: miquido/draive PR: 338
File: src/draive/lmm/__init__.py:1-2
Timestamp: 2025-06-16T10:28:07.434Z
Learning: The draive project requires Python 3.12+ as specified in pyproject.toml with "requires-python = ">=3.12"" and uses Python 3.12+ specific features like PEP 695 type aliases and generic syntax extensively throughout the codebase.
Applied to files:
pyproject.toml
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to **/*.py : Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols like `HTTPRequesting` to keep boundaries explicit
Applied to files:
src/draive/aws/types.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/__init__.py : Centralize public exports in `src/draive/__init__.py`; update this file when adding or modifying public APIs
Applied to files:
src/draive/rabbitmq/__init__.py
🧬 Code graph analysis (6)
src/draive/aws/sqs.py (3)
src/draive/aws/api.py (1)
AWSAPI(9-74)src/draive/aws/types.py (3)
AWSAccessDenied(51-65)AWSError(15-48)AWSResourceNotFound(68-82)src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
src/draive/aws/state.py (1)
src/draive/aws/types.py (1)
AWSSQSQueueAccessing(86-93)
src/draive/aws/types.py (1)
src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
tests/test_tags_replacement.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
tests/test_tags_extraction.py (2)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)src/draive/resources/types.py (4)
of(77-94)of(103-120)of(131-138)of(142-149)
src/draive/aws/client.py (4)
src/draive/aws/sqs.py (2)
AWSSQSMixin(31-235)_queue_access(46-79)src/draive/aws/state.py (1)
AWSSQS(12-86)src/draive/resources/state.py (1)
ResourcesRepository(27-190)src/draive/aws/api.py (2)
_prepare_s3_client(59-62)_prepare_sqs_client(65-68)
🔇 Additional comments (11)
tests/test_tags_extraction.py (1)
466-477: Malformed self‑closing<img>test still valid after quoting changeOnly the string literal quoting changed here; the test still correctly verifies that a malformed
<img src=hero/>is skipped while a following well‑formed tag is parsed. No behavioral impact.tests/test_tags_replacement.py (1)
678-686: Quote style tweak only; malformed<img>replacement behavior unchangedSwitching the HTML snippet to double quotes doesn’t affect the test’s intent:
replacing_tagstill ignores the malformed<img src=logo/>and returns the original content.src/draive/aws/s3.py (1)
252-295: S3 metadata sanitization helpers look correct and safely boundedThe new
_sanitize_metadata_value/_sanitize_metadatapair produce a cleandict[str, str]for S3 metadata: values are stringified, non‑ASCII and control characters are stripped, whitespace is normalized, and values are truncated with an ellipsis to stay within the 1,024‑byte limit. Using these helpers from_uploadviaMetadata=_sanitize_metadata(meta)keeps the boto3 boundary safe without leaking arbitrary user metadata into headers.src/draive/aws/sqs.py (5)
81-111: LGTM!Proper queue URL resolution, error handling, and attribute formatting. The implementation correctly translates queue names to URLs and handles boto3 ClientError exceptions with contextual information.
147-171: LGTM!Proper queue URL resolution, long-polling configuration, and error handling. The fixed polling parameters (
MaxNumberOfMessages=1,WaitTimeSeconds=20) are reasonable defaults for typical SQS consumption patterns.
173-196: LGTM!Correct message acknowledgment via SQS delete operation with proper error handling.
198-235: LGTM!Robust queue URL resolution with proper caching, error handling, and response validation. The instance-level cache correctly avoids cross-region/cross-account conflicts.
252-318: LGTM!Comprehensive error translation following the established pattern from
s3.py. Proper normalization of error codes, status code fallbacks, and contextual error messages with provider/operation/queue information.src/draive/aws/client.py (3)
9-21: LGTM!Proper import structure and multiple inheritance setup. The MRO (Method Resolution Order) is safe since both
AWSS3MixinandAWSSQSMixinproperly callsuper().__init__()and inherit fromAWSAPI.
56-61: Default features changed from(ResourcesRepository,)to()- breaking change.The default value for
featureschanged from(ResourcesRepository,)to an empty tuple at line 61. This means existing code that relied onResourcesRepositorybeing enabled by default will break.If this is intentional (to make features opt-in), ensure:
- Migration guide documents this breaking change
- Version bump reflects the breaking change (already 0.94.1→0.95.0 per PR summary)
If unintentional, restore the previous default.
63-83: LGTM!Clean feature initialization logic with proper client preparation for each feature type. The method correctly prepares S3 for
ResourcesRepositoryand SQS forAWSSQS, returning a composite list of bound feature states.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
♻️ Duplicate comments (5)
src/draive/rabbitmq/__init__.py (1)
1-7: RabbitMQ shim looks good; double‑check centraldraive.__init__exports.This re‑export is fine. If
RabbitMQ,RabbitMQClient, andRabbitMQExceptionare intended as public APIs, ensure they’re also exported fromsrc/draive/__init__.pyto keep the central public surface consistent. Based on learnings, this repo centralizes exports there.pyproject.toml (1)
55-55: Consider boundingpikaversion for therabbitmqextra.
rabbitmq = ["haiway[rabbitmq]", "pika"]leavespikacompletely unpinned, which risks unexpected breakage from upstream releases. Consider constraining it (for example,pika~=X.Y) to match a known‑good range for your stack.src/draive/aws/api.py (1)
12-13: Fix grammar in AWSAPI class docstring.The phrase
"Provides an asynchronous S3 and SQS client initializers"is ungrammatical. Suggest:- Provides an asynchronous S3 and SQS client initializers that other mixins + Provides asynchronous S3 and SQS client initializers that other mixinssrc/draive/aws/sqs.py (2)
31-45: Add a NumPy‑style docstring for publicAWSSQSMixin.
AWSSQSMixinis exported via__all__and forms part of the public AWS façade, but it has no class docstring. Per project guidelines, it should briefly describe its purpose (SQS helpers on top ofAWSAPI), expected usage (mixed intoAWS), and the main public surface (_queue_accessand the higher‑level methods built on it).Consider adding a short NumPy‑style docstring documenting Parameters (inherits
AWSAPIconstructor) and a high‑level description of what the mixin provides.
238-249: Handleboolmessage attribute values explicitly to satisfy SQS constraints.Because
boolis a subclass ofint, thecase int() | float()branch will matchTrue/Falseand encode them as:{"DataType": "Number", "StringValue": "True"} # or "False"AWS SQS requires that attributes with
DataType="Number"haveStringValueset to a numeric string representing a number."True"/"False"are invalid and will causesend_messageto fail.You should treat booleans separately, before the numeric case, and either encode them as strings or reject them explicitly. For example:
def _format_attribute_value( value: RawValue, ) -> dict[str, Any]: match value: + case bool() as value: + # Represent booleans as strings; SQS has no native boolean type. + return {"DataType": "String", "StringValue": "true" if value else "false"} case str() as value: return {"DataType": "String", "StringValue": value} @@ - case int() | float() as value: - return {"DataType": "Number", "StringValue": str(value)} + case int() | float() as value: + return {"DataType": "Number", "StringValue": str(value)} @@ - case value: - raise ValueError(f"Unsupported value {type(value)}") + case value: + raise ValueError(f"Unsupported SQS attribute value type: {type(value)}")This preserves correct handling for numeric types while avoiding invalid
Numberpayloads for booleans.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
pyproject.toml(3 hunks)src/draive/aws/api.py(2 hunks)src/draive/aws/client.py(4 hunks)src/draive/aws/s3.py(1 hunks)src/draive/aws/sqs.py(1 hunks)src/draive/aws/state.py(1 hunks)src/draive/aws/types.py(2 hunks)src/draive/rabbitmq/__init__.py(1 hunks)tests/test_tags_extraction.py(1 hunks)tests/test_tags_replacement.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Ensure latest, most strict typing syntax available from Python 3.13+; use no untyped public APIs and no looseAnyunless required by third-party boundaries
Prefer explicit attribute access with static types; avoid dynamicgetattrexcept at narrow boundaries
In public types, prefer abstract immutable protocols likeMapping,Sequence,Iterableover concretedict,list,set
Usefinalwhere applicable; avoid inheritance and prefer type composition
Use precise unions (|) and narrow withmatch/isinstance; avoidcastunless provably safe and localized
Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols likeHTTPRequestingto keep boundaries explicit
Guard immutability with assertions when crossing context boundaries; failure messages should aid debugging but never leak secrets
All I/O is async; keep boundaries async and usectx.spawnfor detached tasks, avoiding custom threading
Ensure structured concurrency concepts and valid coroutine usage; rely on haiway and asyncio packages
Await long-running operations directly; never block the event loop with sync calls
Translate provider/SDK errors into appropriate typed exceptions; wrap third-party exceptions at the boundary with actionable context (provider,operation, identifiers) while redacting sensitive payloads
Don't raise bareException; preserve contextual information in exception construction
Use observability hooks (ctx.log_*,ctx.record) fromctxhelper instead oflogging; tests assert on emitted events
Files:
tests/test_tags_replacement.pysrc/draive/aws/sqs.pysrc/draive/aws/api.pytests/test_tags_extraction.pysrc/draive/aws/s3.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/state.pysrc/draive/aws/types.pysrc/draive/aws/client.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: No network in unit tests; mock providers and HTTP requests
Keep tests fast and specific to the code you change; start with unit tests around new types, functions, and adapters
Mirror package layout intests/; colocate new tests alongside features and preferpytestparametrization over loops
Test async flows withpytest.mark.asyncio; usectx.scopein tests to isolate state and avoid leaking globals
Files:
tests/test_tags_replacement.pytests/test_tags_extraction.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import symbols fromdraivedirectly (e.g.,from draive import State, ctx) rather than internal module paths
Use context scoping (ctx.scope(...)) to bind scopedDisposables, activeStateinstances, and avoid global state
Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Avoid docstrings for internal and private helpers; keep names self-explanatory
Skip module-level docstrings
Files:
src/draive/aws/sqs.pysrc/draive/aws/api.pysrc/draive/aws/s3.pysrc/draive/rabbitmq/__init__.pysrc/draive/aws/state.pysrc/draive/aws/types.pysrc/draive/aws/client.py
🧠 Learnings (4)
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/**/*.py : Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Applied to files:
src/draive/aws/sqs.pysrc/draive/aws/state.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/__init__.py : Centralize public exports in `src/draive/__init__.py`; update this file when adding or modifying public APIs
Applied to files:
src/draive/rabbitmq/__init__.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to **/*.py : Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols like `HTTPRequesting` to keep boundaries explicit
Applied to files:
src/draive/aws/types.py
📚 Learning: 2025-06-16T10:28:07.434Z
Learnt from: KaQuMiQ
Repo: miquido/draive PR: 338
File: src/draive/lmm/__init__.py:1-2
Timestamp: 2025-06-16T10:28:07.434Z
Learning: The draive project requires Python 3.12+ as specified in pyproject.toml with "requires-python = ">=3.12"" and uses Python 3.12+ specific features like PEP 695 type aliases and generic syntax extensively throughout the codebase.
Applied to files:
pyproject.toml
🧬 Code graph analysis (6)
tests/test_tags_replacement.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
src/draive/aws/sqs.py (3)
src/draive/aws/api.py (1)
AWSAPI(9-74)src/draive/aws/types.py (3)
AWSAccessDenied(51-65)AWSError(15-48)AWSResourceNotFound(68-82)src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
tests/test_tags_extraction.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
src/draive/aws/state.py (1)
src/draive/aws/types.py (1)
AWSSQSQueueAccessing(86-93)
src/draive/aws/types.py (1)
src/draive/aws/state.py (3)
queue(15-22)queue(25-32)queue(35-84)
src/draive/aws/client.py (5)
src/draive/aws/sqs.py (2)
AWSSQSMixin(31-235)_queue_access(46-79)src/draive/aws/state.py (1)
AWSSQS(12-86)src/draive/aws/api.py (3)
AWSAPI(9-74)_prepare_s3_client(59-62)_prepare_sqs_client(65-68)src/draive/resources/state.py (1)
ResourcesRepository(27-190)src/draive/aws/s3.py (2)
fetch(20-70)upload(178-224)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (10)
tests/test_tags_extraction.py (1)
466-476: Cosmetic quote style changes appear unrelated to the PR objective.These changes convert string literals from single to double quotes but don't relate to the MessageQueue feature. The file uses mixed quote styles elsewhere.
This was previously flagged. Consider reverting if accidental, or normalizing quote style across all test files if intentional.
tests/test_tags_replacement.py (1)
679-679: Cosmetic quote style change appears unrelated to the PR objective.Same observation as in the extraction tests - this quote style change seems unrelated to the MessageQueue feature.
This was previously flagged. Consider reverting or standardizing quote usage across all test files.
src/draive/rabbitmq/__init__.py (1)
1-7: Re-export shim looks good; consider centralizing indraive.__init__as well.The re-export from
haiway.rabbitmqwith explicit__all__is clean. If these symbols are meant to be part of the public API, they should also be re-exported fromsrc/draive/__init__.pyfor consistency.Based on learnings, this repo centralizes public exports in
src/draive/__init__.py.pyproject.toml (1)
55-55: Consider pinningpikato a version range for consistency.Other dependencies in this file use version constraints (e.g.,
~=X.Y). Leavingpikaunpinned may lead to unexpected breaking changes from upstream.This was previously flagged. Consider adding a version constraint like
pika~=1.3or similar.src/draive/aws/types.py (1)
85-93: Add a NumPy-style docstring for the publicAWSSQSQueueAccessingprotocol.This protocol is exported via
__all__and represents a public boundary. Per coding guidelines, it should carry a NumPy-style docstring with Parameters/Returns sections.The type signature now correctly uses
Callable[[str], Content]for the decoder, which aligns with the actual SQS implementation.src/draive/aws/api.py (1)
12-12: Grammar issue in docstring.The docstring has a subject-verb agreement error: "Provides an asynchronous S3 and SQS client initializers" should be "Provides asynchronous S3 and SQS client initializers" (remove "an").
This was previously flagged but appears to still be present in the code.
src/draive/aws/s3.py (1)
278-292: Consider narrowing types and separating key handling.The
value: Anyparameter type is loose per coding guidelines. Also, S3 metadata keys behave like HTTP header names (stricter charset than values). Using the same sanitization for both keys and values may not fully enforce key constraints.Consider:
- Narrowing
valuetype toBasicValue | str- Adding a dedicated
_sanitize_metadata_keyhelper that enforces ASCII for header-name compatibilitysrc/draive/aws/state.py (2)
12-12: Add a class-level NumPy-style docstring forAWSSQS.
AWSSQSis exported via__all__and is a publicStatesubclass, but lacks a class-level docstring. Per coding guidelines, public symbols should have NumPy-style docstrings describing purpose and usage.This was previously flagged. Add a brief docstring describing AWSSQS as a stateful accessor for AWS SQS queues.
50-51: Minor docstring clarification.The docstring says "broker-serializable objects" but the actual type is
str. Consider updating to "strings" for precision:- content_encoder : Callable[[Content], str] - Callable that transforms typed payloads into broker-serializable objects before publish. + content_encoder : Callable[[Content], str] + Callable that transforms typed payloads into strings before publish.This was previously flagged.
src/draive/aws/sqs.py (1)
30-43: Add a NumPy-style docstring for publicAWSSQSMixin.
AWSSQSMixinis exported via__all__but has no class-level docstring. Per the guidelines forsrc/draive/**/*.py, public symbols should have NumPy-style docstrings withParameters,Returns, andRaisessections that describe the mixin’s purpose and its main entry point (_queue_access).Consider adding a concise class docstring explaining that this mixin provides SQS-backed
MQQueueaccess forAWSAPIsubclasses, outlines howqueue,content_encoder, andcontent_decoderare used, and summarizes which AWS errors can be raised during publish/consume flows. Based on learnings.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: ASSERTIVE
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (10)
pyproject.toml(3 hunks)src/draive/aws/api.py(2 hunks)src/draive/aws/client.py(4 hunks)src/draive/aws/s3.py(1 hunks)src/draive/aws/sqs.py(1 hunks)src/draive/aws/state.py(1 hunks)src/draive/aws/types.py(2 hunks)src/draive/rabbitmq/__init__.py(1 hunks)tests/test_tags_extraction.py(1 hunks)tests/test_tags_replacement.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Ensure latest, most strict typing syntax available from Python 3.13+; use no untyped public APIs and no looseAnyunless required by third-party boundaries
Prefer explicit attribute access with static types; avoid dynamicgetattrexcept at narrow boundaries
In public types, prefer abstract immutable protocols likeMapping,Sequence,Iterableover concretedict,list,set
Usefinalwhere applicable; avoid inheritance and prefer type composition
Use precise unions (|) and narrow withmatch/isinstance; avoidcastunless provably safe and localized
Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols likeHTTPRequestingto keep boundaries explicit
Guard immutability with assertions when crossing context boundaries; failure messages should aid debugging but never leak secrets
All I/O is async; keep boundaries async and usectx.spawnfor detached tasks, avoiding custom threading
Ensure structured concurrency concepts and valid coroutine usage; rely on haiway and asyncio packages
Await long-running operations directly; never block the event loop with sync calls
Translate provider/SDK errors into appropriate typed exceptions; wrap third-party exceptions at the boundary with actionable context (provider,operation, identifiers) while redacting sensitive payloads
Don't raise bareException; preserve contextual information in exception construction
Use observability hooks (ctx.log_*,ctx.record) fromctxhelper instead oflogging; tests assert on emitted events
Files:
src/draive/aws/client.pysrc/draive/aws/state.pytests/test_tags_extraction.pytests/test_tags_replacement.pysrc/draive/aws/api.pysrc/draive/aws/sqs.pysrc/draive/aws/types.pysrc/draive/aws/s3.pysrc/draive/rabbitmq/__init__.py
src/draive/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
src/draive/**/*.py: Import symbols fromdraivedirectly (e.g.,from draive import State, ctx) rather than internal module paths
Use context scoping (ctx.scope(...)) to bind scopedDisposables, activeStateinstances, and avoid global state
Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Avoid docstrings for internal and private helpers; keep names self-explanatory
Skip module-level docstrings
Files:
src/draive/aws/client.pysrc/draive/aws/state.pysrc/draive/aws/api.pysrc/draive/aws/sqs.pysrc/draive/aws/types.pysrc/draive/aws/s3.pysrc/draive/rabbitmq/__init__.py
tests/**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
tests/**/*.py: No network in unit tests; mock providers and HTTP requests
Keep tests fast and specific to the code you change; start with unit tests around new types, functions, and adapters
Mirror package layout intests/; colocate new tests alongside features and preferpytestparametrization over loops
Test async flows withpytest.mark.asyncio; usectx.scopein tests to isolate state and avoid leaking globals
Files:
tests/test_tags_extraction.pytests/test_tags_replacement.py
🧠 Learnings (4)
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/**/*.py : Add NumPy-style docstrings to public symbols; include Parameters, Returns, and Raises sections with rationale
Applied to files:
src/draive/aws/state.pysrc/draive/aws/sqs.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to **/*.py : Favor structural typing (Protocols) for async clients and adapters; use runtime-checkable protocols like `HTTPRequesting` to keep boundaries explicit
Applied to files:
src/draive/aws/types.py
📚 Learning: 2025-12-02T14:30:08.757Z
Learnt from: CR
Repo: miquido/draive PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-02T14:30:08.757Z
Learning: Applies to src/draive/__init__.py : Centralize public exports in `src/draive/__init__.py`; update this file when adding or modifying public APIs
Applied to files:
src/draive/rabbitmq/__init__.py
📚 Learning: 2025-06-16T10:28:07.434Z
Learnt from: KaQuMiQ
Repo: miquido/draive PR: 338
File: src/draive/lmm/__init__.py:1-2
Timestamp: 2025-06-16T10:28:07.434Z
Learning: The draive project requires Python 3.12+ as specified in pyproject.toml with "requires-python = ">=3.12"" and uses Python 3.12+ specific features like PEP 695 type aliases and generic syntax extensively throughout the codebase.
Applied to files:
pyproject.toml
🧬 Code graph analysis (2)
src/draive/aws/state.py (1)
src/draive/aws/types.py (1)
AWSSQSQueueAccessing(86-93)
tests/test_tags_replacement.py (1)
src/draive/multimodal/content.py (3)
MultimodalContent(25-592)of(42-66)of(619-647)
🔇 Additional comments (4)
src/draive/aws/api.py (1)
64-68: LGTM!The
_prepare_sqs_clientmethod correctly mirrors the S3 client initialization pattern with proper@asynchronousdecoration.src/draive/aws/s3.py (1)
252-275: Metadata sanitization logic is sound.The implementation correctly handles UTF-8 truncation by encoding, truncating at byte level, and decoding with
"ignore"to handle partial multi-byte characters at the boundary.src/draive/aws/state.py (1)
34-84: Implementation looks correct and well-structured.The
@statemethodpattern with overloads for both class-level and instance-level usage is appropriate. The delegation toqueue_accessingis clean, and the comprehensive docstring with usage example is helpful.src/draive/aws/client.py (1)
16-82: SQS feature wiring inAWSlooks coherent; confirm callers rely on explicitfeatures.The integration of
AWSSQSMixinandAWSSQSintoAWSis clean:__aenter__now prepares S3 and SQS clients only when their respective state classes are present inself._features, and returns a concretelist[State]as declared.One behavioral change is that
_featuresnow defaults to()whenfeatures is None, soasync with AWS(...) as features:will yield an empty list unless callers explicitly pass something like{ResourcesRepository, AWSSQS}. If previous usage relied on an implicit default (e.g., always gettingResourcesRepository), please double-check and update call sites / docs accordingly.
No description provided.