feat(flo-cloud): add Azure Storage Queue as message queue#253
Conversation
Implements StorageQueue, an Azure Storage Queue-backed MessageQueue for the flo_cloud package, equivalent to the existing SQSQueue and PubSubQueue implementations. - Adds flo_cloud/azure/storage_queue.py with StorageQueue(MessageQueue) using QueueClient from azure-storage-queue, DefaultAzureCredential auth, and peek-lock style processing via pop_receipt as the ack_id - Handles both plain JSON (own messages) and base64-encoded JSON (Event Grid delivery) transparently in receive_messages - Wires Azure branch into MessageQueueManager in message_queue.py - Exports StorageQueue from flo_cloud/azure/__init__.py - Adds azure-storage-queue>=12.10.0 to pyproject.toml dependencies Required env vars: AZURE_STORAGE_QUEUE_URL, AZURE_STORAGE_QUEUE_NAME Auth: AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID (via DefaultAzureCredential)
📝 WalkthroughWalkthroughAdds an Azure Storage Queue-backed MessageQueue implementation (StorageQueue), wires it into MessageQueueManager for CloudProvider.AZURE, exposes it in the flo_cloud.azure package, and adds the azure-storage-queue dependency. Changes
Sequence Diagram(s)sequenceDiagram
participant App as Client/App
participant Mgr as MessageQueueManager
participant SQ as StorageQueue
participant Azure as Azure Queue Service
participant Cred as DefaultAzureCredential
App->>Mgr: request queue client (CloudProvider.AZURE)
Mgr->>SQ: instantiate StorageQueue()
SQ->>Cred: obtain credentials
Cred-->>SQ: token
SQ->>Azure: send/receive/delete messages (JSON bodies, pop_receipt ack)
Azure-->>SQ: messages / pop_receipt / delete confirmation
SQ-->>Mgr: returns messages (with ack_id)
Mgr-->>App: delivers messages
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py (1)
46-47: Consider documenting or addressing unbounded_pendinggrowth.The
_pendingdict grows with each received message and only shrinks when messages are deleted. If consumers crash or fail to delete messages (e.g., due to processing errors), this dict will grow indefinitely. This is likely acceptable given the expected usage pattern (receive → process → delete), but consider adding a warning in the docstring or implementing periodic cleanup of stale entries.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py` around lines 46 - 47, The _pending dict (attribute _pending) can grow unbounded if received messages are not deleted (e.g., consumer crash); either document this in the class/module docstring to warn users that entries are only removed on successful delete, or add automatic cleanup: attach a TTL timestamp to entries when inserting in receive/ack flow and run a periodic cleanup task (or perform lazy pruning in receive/delete methods) to remove stale entries, ensuring you update the code paths that insert into _pending (e.g., wherever receive_message populates _pending) and those that remove entries (e.g., delete/ack handling) to use the timestamp-based eviction.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py`:
- Around line 75-89: The add_message method treats topic_name_or_queue_url
ambiguously; change it to accept both a full queue URL and a queue name by
normalizing the input before comparing/constructing clients: if
topic_name_or_queue_url is provided, detect whether it's a full URL (e.g.,
starts with "http"/"https") and extract the queue name (path segment) from it,
then compare that extracted name against self._queue_name; when creating a new
QueueClient, pass the extracted queue name (not the full URL) along with
self._account_url and self._credential (keeping
message_encode_policy/message_decode_policy as before) and only fall back to
self._client when names match or no override is provided—alternatively, if you
prefer a simpler API, rename the parameter to queue_name and require callers to
pass only names and update usages of add_message accordingly.
- Around line 49-64: The receive_messages method conflates wait_time_sec
(caller-expected long-poll) with Azure's visibility_timeout; update
receive_messages to fix this by either (A) adding a clear docstring on
receive_messages explaining Azure returns immediately and that wait_time_sec
maps to visibility_timeout, or (B) implement true long-poll semantics: add a new
parameter long_poll_seconds (or keep wait_time_sec but treat it as poll
timeout), loop calling self._client.receive_messages with a small
visibility_timeout (or separate visibility_timeout param) until
long_poll_seconds elapses, sleeping between attempts, while preserving the
existing pending tracking (self._pending[msg.pop_receipt] = msg.id) and decoding
messages with _decode_message; or (C) split parameters (visibility_timeout for
_client.receive_messages and wait_time_sec for polling) and ensure
_client.receive_messages is called with the visibility_timeout argument, not the
polling timeout.
- Around line 12-21: The fallback path in _decode_message currently calls
base64.b64decode(content) without handling invalid base64, which raises
binascii.Error and yields a confusing exception; update _decode_message to catch
binascii.Error (raised by base64.b64decode) and raise a clearer exception (e.g.,
ValueError("content is neither valid JSON nor valid base64-encoded JSON")) or
re-raise a json.JSONDecodeError with an informative message, preserving the
original exception as the __cause__; ensure the try/except around
json.loads(content) remains and only the base64 decode branch adds this
additional error handling.
---
Nitpick comments:
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py`:
- Around line 46-47: The _pending dict (attribute _pending) can grow unbounded
if received messages are not deleted (e.g., consumer crash); either document
this in the class/module docstring to warn users that entries are only removed
on successful delete, or add automatic cleanup: attach a TTL timestamp to
entries when inserting in receive/ack flow and run a periodic cleanup task (or
perform lazy pruning in receive/delete methods) to remove stale entries,
ensuring you update the code paths that insert into _pending (e.g., wherever
receive_message populates _pending) and those that remove entries (e.g.,
delete/ack handling) to use the timestamp-based eviction.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 520d4e4c-050d-4239-9347-6252ddbd333d
⛔ Files ignored due to path filters (1)
wavefront/server/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
wavefront/server/packages/flo_cloud/flo_cloud/azure/__init__.pywavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.pywavefront/server/packages/flo_cloud/flo_cloud/message_queue.pywavefront/server/packages/flo_cloud/pyproject.toml
| def add_message( | ||
| self, message_body: dict, topic_name_or_queue_url: str | None = None | ||
| ) -> str: | ||
| if topic_name_or_queue_url and topic_name_or_queue_url != self._queue_name: | ||
| client = QueueClient( | ||
| account_url=self._account_url, | ||
| queue_name=topic_name_or_queue_url, | ||
| credential=self._credential, | ||
| message_encode_policy=None, | ||
| message_decode_policy=None, | ||
| ) | ||
| else: | ||
| client = self._client | ||
| result = client.send_message(json.dumps(message_body)) | ||
| return result.id |
There was a problem hiding this comment.
Ambiguous handling of topic_name_or_queue_url parameter.
The comparison topic_name_or_queue_url != self._queue_name only works correctly if the caller passes a queue name. If they pass a full queue URL (as the parameter name suggests is possible), the comparison will never match self._queue_name, and the code will attempt to use a URL as queue_name in the QueueClient constructor, which would fail.
Consider clarifying the expected input or normalizing the comparison:
🛠️ Suggested fix to handle both URL and name
def add_message(
self, message_body: dict, topic_name_or_queue_url: str | None = None
) -> str:
- if topic_name_or_queue_url and topic_name_or_queue_url != self._queue_name:
+ # Normalize: use default queue if not specified or if it matches current queue
+ target_queue = topic_name_or_queue_url
+ if not target_queue or target_queue == self._queue_name:
client = self._client
+ else:
+ # Assume topic_name_or_queue_url is a queue name (not a full URL)
client = QueueClient(
account_url=self._account_url,
- queue_name=topic_name_or_queue_url,
+ queue_name=target_queue,
credential=self._credential,
message_encode_policy=None,
message_decode_policy=None,
)
- else:
- client = self._client
result = client.send_message(json.dumps(message_body))
return result.idAlternatively, rename the parameter to just queue_name to clarify the expected input format, aligning with Azure's terminology.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py` around
lines 75 - 89, The add_message method treats topic_name_or_queue_url
ambiguously; change it to accept both a full queue URL and a queue name by
normalizing the input before comparing/constructing clients: if
topic_name_or_queue_url is provided, detect whether it's a full URL (e.g.,
starts with "http"/"https") and extract the queue name (path segment) from it,
then compare that extracted name against self._queue_name; when creating a new
QueueClient, pass the extracted queue name (not the full URL) along with
self._account_url and self._credential (keeping
message_encode_policy/message_decode_policy as before) and only fall back to
self._client when names match or no override is provided—alternatively, if you
prefer a simpler API, rename the parameter to queue_name and require callers to
pass only names and update usages of add_message accordingly.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py`:
- Around line 24-27: The except block that catches (binascii.Error,
UnicodeDecodeError) and currently does "raise ValueError(f'Message content is
neither valid JSON nor base64-encoded JSON: {e}')" should chain the original
exception by re-raising the ValueError using "raise ... from e" so the original
traceback is preserved; locate the except block in the message decoding logic
(the except (binascii.Error, UnicodeDecodeError) as e: handler) and modify the
raise to include "from e" while keeping the existing error message.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9a4122b1-523b-4ed6-932c-38ad2922119d
📒 Files selected for processing (1)
wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py
| except (binascii.Error, UnicodeDecodeError) as e: | ||
| raise ValueError( | ||
| f'Message content is neither valid JSON nor base64-encoded JSON: {e}' | ||
| ) |
There was a problem hiding this comment.
Chain exception using raise ... from e for proper traceback.
The static analysis tool (Ruff B904) correctly identifies that exceptions raised within an except clause should use exception chaining. This preserves the original traceback and makes debugging easier.
🛠️ Proposed fix
except (binascii.Error, UnicodeDecodeError) as e:
raise ValueError(
f'Message content is neither valid JSON nor base64-encoded JSON: {e}'
- )
+ ) from e🧰 Tools
🪛 Ruff (0.15.6)
[warning] 25-27: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py` around
lines 24 - 27, The except block that catches (binascii.Error,
UnicodeDecodeError) and currently does "raise ValueError(f'Message content is
neither valid JSON nor base64-encoded JSON: {e}')" should chain the original
exception by re-raising the ValueError using "raise ... from e" so the original
traceback is preserved; locate the except block in the message decoding logic
(the except (binascii.Error, UnicodeDecodeError) as e: handler) and modify the
raise to include "from e" while keeping the existing error message.
* feat(flo-cloud): add Azure Storage Queue as message queue backend Implements StorageQueue, an Azure Storage Queue-backed MessageQueue for the flo_cloud package, equivalent to the existing SQSQueue and PubSubQueue implementations. - Adds flo_cloud/azure/storage_queue.py with StorageQueue(MessageQueue) using QueueClient from azure-storage-queue, DefaultAzureCredential auth, and peek-lock style processing via pop_receipt as the ack_id - Handles both plain JSON (own messages) and base64-encoded JSON (Event Grid delivery) transparently in receive_messages - Wires Azure branch into MessageQueueManager in message_queue.py - Exports StorageQueue from flo_cloud/azure/__init__.py - Adds azure-storage-queue>=12.10.0 to pyproject.toml dependencies Required env vars: AZURE_STORAGE_QUEUE_URL, AZURE_STORAGE_QUEUE_NAME Auth: AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID (via DefaultAzureCredential) * resolved review comment
Implements StorageQueue, an Azure Storage Queue-backed MessageQueue for the flo_cloud package, equivalent to the existing SQSQueue and PubSubQueue implementations.
Required env vars: AZURE_STORAGE_QUEUE_URL, AZURE_STORAGE_QUEUE_NAME
Auth: AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID (via DefaultAzureCredential)
Summary by CodeRabbit
New Features
Chores