Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .blob_storage import AzureBlobStorage
from .storage_queue import StorageQueue

__all__ = ['AzureBlobStorage']
__all__ = ['AzureBlobStorage', 'StorageQueue']
107 changes: 107 additions & 0 deletions wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import base64
import binascii
import json
import os
from typing import List

from azure.identity import DefaultAzureCredential
from azure.storage.queue import QueueClient

from .._types import MessageQueue, MessageQueueDict


def _decode_message(content: str):
"""Parse message content as JSON, falling back to base64-decode first.

Event Grid delivers messages to Storage Queue as base64-encoded JSON.
Messages we send ourselves are plain JSON.
"""
try:
return json.loads(content)
except json.JSONDecodeError:
try:
return json.loads(base64.b64decode(content).decode('utf-8'))
except (binascii.Error, UnicodeDecodeError) as e:
raise ValueError(
f'Message content is neither valid JSON nor base64-encoded JSON: {e}'
)
Comment on lines +24 to +27
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Chain exception using raise ... from e for proper traceback.

The static analysis tool (Ruff B904) correctly identifies that exceptions raised within an except clause should use exception chaining. This preserves the original traceback and makes debugging easier.

🛠️ Proposed fix
         except (binascii.Error, UnicodeDecodeError) as e:
             raise ValueError(
                 f'Message content is neither valid JSON nor base64-encoded JSON: {e}'
-            )
+            ) from e
🧰 Tools
🪛 Ruff (0.15.6)

[warning] 25-27: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py` around
lines 24 - 27, The except block that catches (binascii.Error,
UnicodeDecodeError) and currently does "raise ValueError(f'Message content is
neither valid JSON nor base64-encoded JSON: {e}')" should chain the original
exception by re-raising the ValueError using "raise ... from e" so the original
traceback is preserved; locate the except block in the message decoding logic
(the except (binascii.Error, UnicodeDecodeError) as e: handler) and modify the
raise to include "from e" while keeping the existing error message.



class StorageQueue(MessageQueue):
"""Azure Storage Queue implementation."""

def __init__(self):
account_url = os.environ.get('AZURE_STORAGE_QUEUE_URL')
if not account_url:
raise ValueError('AZURE_STORAGE_QUEUE_URL env var must be set')

queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME')
if not queue_name:
raise ValueError('AZURE_STORAGE_QUEUE_NAME env var must be set')

self._account_url = account_url
self._queue_name = queue_name
self._credential = DefaultAzureCredential()
self._client = QueueClient(
account_url=account_url,
queue_name=queue_name,
credential=self._credential,
message_encode_policy=None,
message_decode_policy=None,
)
# Maps pop_receipt (ack_id) -> message_id for delete
self._pending: dict[str, str] = {}

def receive_messages(
self, max_messages=10, wait_time_sec=20
) -> List[MessageQueueDict]:
"""Receive messages from Azure Storage Queue.

Note: Azure Storage Queue does not support long-polling. This method
returns immediately, potentially with an empty list. The `wait_time_sec`
parameter is repurposed as the visibility timeout, controlling how long
received messages remain hidden from other consumers.

Args:
max_messages: Maximum number of messages to receive (1-32).
wait_time_sec: Visibility timeout in seconds for received messages.

Returns:
List of MessageQueueDict, possibly empty.
"""
received = []
for msg in self._client.receive_messages(
max_messages=max_messages,
visibility_timeout=wait_time_sec,
):
self._pending[msg.pop_receipt] = msg.id
body = _decode_message(msg.content)
received.append(
MessageQueueDict(body=body, ack_id=msg.pop_receipt, id=msg.id)
)
return received
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def delete_message(self, ack_id: str):
message_id = self._pending.pop(ack_id, None)
if message_id is None:
raise ValueError(
f'No pending message found for ack_id {ack_id!r}. '
'It may have already been deleted or never received.'
)
self._client.delete_message(message_id, ack_id)

def add_message(
self, message_body: dict, topic_name_or_queue_url: str | None = None
) -> str:
if topic_name_or_queue_url and topic_name_or_queue_url != self._queue_name:
client = QueueClient(
account_url=self._account_url,
queue_name=topic_name_or_queue_url,
credential=self._credential,
message_encode_policy=None,
message_decode_policy=None,
)
else:
client = self._client
result = client.send_message(json.dumps(message_body))
return result.id
Comment on lines +93 to +107
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Ambiguous handling of topic_name_or_queue_url parameter.

The comparison topic_name_or_queue_url != self._queue_name only works correctly if the caller passes a queue name. If they pass a full queue URL (as the parameter name suggests is possible), the comparison will never match self._queue_name, and the code will attempt to use a URL as queue_name in the QueueClient constructor, which would fail.

Consider clarifying the expected input or normalizing the comparison:

🛠️ Suggested fix to handle both URL and name
     def add_message(
         self, message_body: dict, topic_name_or_queue_url: str | None = None
     ) -> str:
-        if topic_name_or_queue_url and topic_name_or_queue_url != self._queue_name:
+        # Normalize: use default queue if not specified or if it matches current queue
+        target_queue = topic_name_or_queue_url
+        if not target_queue or target_queue == self._queue_name:
             client = self._client
+        else:
+            # Assume topic_name_or_queue_url is a queue name (not a full URL)
             client = QueueClient(
                 account_url=self._account_url,
-                queue_name=topic_name_or_queue_url,
+                queue_name=target_queue,
                 credential=self._credential,
                 message_encode_policy=None,
                 message_decode_policy=None,
             )
-        else:
-            client = self._client
         result = client.send_message(json.dumps(message_body))
         return result.id

Alternatively, rename the parameter to just queue_name to clarify the expected input format, aligning with Azure's terminology.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py` around
lines 75 - 89, The add_message method treats topic_name_or_queue_url
ambiguously; change it to accept both a full queue URL and a queue name by
normalizing the input before comparing/constructing clients: if
topic_name_or_queue_url is provided, detect whether it's a full URL (e.g.,
starts with "http"/"https") and extract the queue name (path segment) from it,
then compare that extracted name against self._queue_name; when creating a new
QueueClient, pass the extracted queue name (not the full URL) along with
self._account_url and self._credential (keeping
message_encode_policy/message_decode_policy as before) and only fall back to
self._client when names match or no override is provided—alternatively, if you
prefer a simpler API, rename the parameter to queue_name and require callers to
pass only names and update usages of add_message accordingly.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ._types import CloudProvider, MessageQueue
from .aws.sqs import SQSQueue
from .azure.storage_queue import StorageQueue
from .gcp.pubsub import PubSubQueue


Expand All @@ -13,6 +14,8 @@ def __get_message_queue_client(self) -> MessageQueue:
return SQSQueue()
elif self.cloud_provider == CloudProvider.GCP.value:
return PubSubQueue()
elif self.cloud_provider == CloudProvider.AZURE.value:
return StorageQueue()
else:
raise ValueError(f'Unsupported cloud provider: {self.cloud_provider}')

Expand Down
1 change: 1 addition & 0 deletions wavefront/server/packages/flo_cloud/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ requires-python = ">=3.11"
dependencies = [
"azure-identity>=1.17.0",
"azure-storage-blob>=12.20.0",
"azure-storage-queue>=12.10.0",
"boto3<=1.38.40",
"cryptography>=45.0.4",
"google-cloud-bigquery==3.34.0",
Expand Down
17 changes: 17 additions & 0 deletions wavefront/server/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading