diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/azure/__init__.py b/wavefront/server/packages/flo_cloud/flo_cloud/azure/__init__.py index abcd0774..4eef673d 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/azure/__init__.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/azure/__init__.py @@ -1,3 +1,4 @@ from .blob_storage import AzureBlobStorage +from .storage_queue import StorageQueue -__all__ = ['AzureBlobStorage'] +__all__ = ['AzureBlobStorage', 'StorageQueue'] diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py b/wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py new file mode 100644 index 00000000..a7ac5b1e --- /dev/null +++ b/wavefront/server/packages/flo_cloud/flo_cloud/azure/storage_queue.py @@ -0,0 +1,107 @@ +import base64 +import binascii +import json +import os +from typing import List + +from azure.identity import DefaultAzureCredential +from azure.storage.queue import QueueClient + +from .._types import MessageQueue, MessageQueueDict + + +def _decode_message(content: str): + """Parse message content as JSON, falling back to base64-decode first. + + Event Grid delivers messages to Storage Queue as base64-encoded JSON. + Messages we send ourselves are plain JSON. + """ + try: + return json.loads(content) + except json.JSONDecodeError: + try: + return json.loads(base64.b64decode(content).decode('utf-8')) + except (binascii.Error, UnicodeDecodeError) as e: + raise ValueError( + f'Message content is neither valid JSON nor base64-encoded JSON: {e}' + ) + + +class StorageQueue(MessageQueue): + """Azure Storage Queue implementation.""" + + def __init__(self): + account_url = os.environ.get('AZURE_STORAGE_QUEUE_URL') + if not account_url: + raise ValueError('AZURE_STORAGE_QUEUE_URL env var must be set') + + queue_name = os.environ.get('AZURE_STORAGE_QUEUE_NAME') + if not queue_name: + raise ValueError('AZURE_STORAGE_QUEUE_NAME env var must be set') + + self._account_url = account_url + self._queue_name = queue_name + self._credential = DefaultAzureCredential() + self._client = QueueClient( + account_url=account_url, + queue_name=queue_name, + credential=self._credential, + message_encode_policy=None, + message_decode_policy=None, + ) + # Maps pop_receipt (ack_id) -> message_id for delete + self._pending: dict[str, str] = {} + + def receive_messages( + self, max_messages=10, wait_time_sec=20 + ) -> List[MessageQueueDict]: + """Receive messages from Azure Storage Queue. + + Note: Azure Storage Queue does not support long-polling. This method + returns immediately, potentially with an empty list. The `wait_time_sec` + parameter is repurposed as the visibility timeout, controlling how long + received messages remain hidden from other consumers. + + Args: + max_messages: Maximum number of messages to receive (1-32). + wait_time_sec: Visibility timeout in seconds for received messages. + + Returns: + List of MessageQueueDict, possibly empty. + """ + received = [] + for msg in self._client.receive_messages( + max_messages=max_messages, + visibility_timeout=wait_time_sec, + ): + self._pending[msg.pop_receipt] = msg.id + body = _decode_message(msg.content) + received.append( + MessageQueueDict(body=body, ack_id=msg.pop_receipt, id=msg.id) + ) + return received + + def delete_message(self, ack_id: str): + message_id = self._pending.pop(ack_id, None) + if message_id is None: + raise ValueError( + f'No pending message found for ack_id {ack_id!r}. ' + 'It may have already been deleted or never received.' + ) + self._client.delete_message(message_id, ack_id) + + def add_message( + self, message_body: dict, topic_name_or_queue_url: str | None = None + ) -> str: + if topic_name_or_queue_url and topic_name_or_queue_url != self._queue_name: + client = QueueClient( + account_url=self._account_url, + queue_name=topic_name_or_queue_url, + credential=self._credential, + message_encode_policy=None, + message_decode_policy=None, + ) + else: + client = self._client + result = client.send_message(json.dumps(message_body)) + return result.id diff --git a/wavefront/server/packages/flo_cloud/flo_cloud/message_queue.py b/wavefront/server/packages/flo_cloud/flo_cloud/message_queue.py index a66917a0..fa22bc32 100644 --- a/wavefront/server/packages/flo_cloud/flo_cloud/message_queue.py +++ b/wavefront/server/packages/flo_cloud/flo_cloud/message_queue.py @@ -1,5 +1,6 @@ from ._types import CloudProvider, MessageQueue from .aws.sqs import SQSQueue +from .azure.storage_queue import StorageQueue from .gcp.pubsub import PubSubQueue @@ -13,6 +14,8 @@ def __get_message_queue_client(self) -> MessageQueue: return SQSQueue() elif self.cloud_provider == CloudProvider.GCP.value: return PubSubQueue() + elif self.cloud_provider == CloudProvider.AZURE.value: + return StorageQueue() else: raise ValueError(f'Unsupported cloud provider: {self.cloud_provider}') diff --git a/wavefront/server/packages/flo_cloud/pyproject.toml b/wavefront/server/packages/flo_cloud/pyproject.toml index a6ba6c04..7692bb21 100644 --- a/wavefront/server/packages/flo_cloud/pyproject.toml +++ b/wavefront/server/packages/flo_cloud/pyproject.toml @@ -10,6 +10,7 @@ requires-python = ">=3.11" dependencies = [ "azure-identity>=1.17.0", "azure-storage-blob>=12.20.0", + "azure-storage-queue>=12.10.0", "boto3<=1.38.40", "cryptography>=45.0.4", "google-cloud-bigquery==3.34.0", diff --git a/wavefront/server/uv.lock b/wavefront/server/uv.lock index 0654a46b..067eaa25 100644 --- a/wavefront/server/uv.lock +++ b/wavefront/server/uv.lock @@ -538,6 +538,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d8/3a/6ef2047a072e54e1142718d433d50e9514c999a58f51abfff7902f3a72f8/azure_storage_blob-12.28.0-py3-none-any.whl", hash = "sha256:00fb1db28bf6a7b7ecaa48e3b1d5c83bfadacc5a678b77826081304bd87d6461", size = 431499, upload-time = "2026-01-06T23:48:58.995Z" }, ] +[[package]] +name = "azure-storage-queue" +version = "12.15.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "azure-core" }, + { name = "cryptography" }, + { name = "isodate" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/48/23/e3b46de244a133675c8c20f3ef2be6cbaf22a41f03e04e1cb2acd609bf5f/azure_storage_queue-12.15.0.tar.gz", hash = "sha256:4e01dcae5aefd0c463f7bae5c75c8a91f955c893f14ed7590fc0cd447ac4666d", size = 197521, upload-time = "2026-01-07T00:18:03.616Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d9/22/5da115105c9fe7e2fc11804018649b394f60a62735e19642acf336e3807a/azure_storage_queue-12.15.0-py3-none-any.whl", hash = "sha256:056cfce0cd60458f0b7653d804f639098b14593f843899c6c0fc65b3ebe61210", size = 187547, upload-time = "2026-01-07T00:18:05.23Z" }, +] + [[package]] name = "bcrypt" version = "4.3.0" @@ -1405,6 +1420,7 @@ source = { editable = "packages/flo_cloud" } dependencies = [ { name = "azure-identity" }, { name = "azure-storage-blob" }, + { name = "azure-storage-queue" }, { name = "boto3" }, { name = "cryptography" }, { name = "google-cloud-bigquery" }, @@ -1418,6 +1434,7 @@ dependencies = [ requires-dist = [ { name = "azure-identity", specifier = ">=1.17.0" }, { name = "azure-storage-blob", specifier = ">=12.20.0" }, + { name = "azure-storage-queue", specifier = ">=12.10.0" }, { name = "boto3", specifier = "<=1.38.40" }, { name = "cryptography", specifier = ">=45.0.4" }, { name = "google-cloud-bigquery", specifier = "==3.34.0" },