Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions fila/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
"""Fila Python client SDK for the Fila message broker."""
"""Fila -- Python client SDK for the Fila message broker."""

from fila.async_client import AsyncClient
from fila.client import Client
from fila.errors import (
BatchEnqueueError,
FilaError,
MessageNotFoundError,
QueueNotFoundError,
RPCError,
)
from fila.types import ConsumeMessage
from fila.types import BatchEnqueueResult, BatchMode, ConsumeMessage, Linger

__all__ = [
"AsyncClient",
"BatchEnqueueError",
"BatchEnqueueResult",
"BatchMode",
"Client",
"ConsumeMessage",
"FilaError",
"Linger",
"MessageNotFoundError",
"QueueNotFoundError",
"RPCError",
Expand Down
88 changes: 74 additions & 14 deletions fila/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,15 @@
if TYPE_CHECKING:
from collections.abc import AsyncIterator

from fila.errors import _map_ack_error, _map_consume_error, _map_enqueue_error, _map_nack_error
from fila.types import ConsumeMessage
from fila.client import _proto_msg_to_consume_message
from fila.errors import (
_map_ack_error,
_map_batch_enqueue_error,
_map_consume_error,
_map_enqueue_error,
_map_nack_error,
)
from fila.types import BatchEnqueueResult, ConsumeMessage
from fila.v1 import service_pb2, service_pb2_grpc


Expand Down Expand Up @@ -118,7 +125,8 @@ def _extract_leader_hint(err: grpc.RpcError) -> str | None:
class AsyncClient:
"""Asynchronous client for the Fila message broker.

Wraps the hot-path gRPC operations: enqueue, consume, ack, nack.
Wraps the hot-path gRPC operations: enqueue, batch_enqueue, consume, ack,
nack.

Usage::

Expand Down Expand Up @@ -256,6 +264,55 @@ async def enqueue(
raise _map_enqueue_error(e) from e
return str(resp.message_id)

async def batch_enqueue(
self,
messages: list[tuple[str, dict[str, str] | None, bytes]],
) -> list[BatchEnqueueResult]:
"""Enqueue multiple messages in a single RPC.

Args:
messages: List of (queue, headers, payload) tuples.

Returns:
List of ``BatchEnqueueResult`` objects, one per input message.
Each result has either a ``message_id`` (success) or ``error``
(per-message failure).

Raises:
QueueNotFoundError: If a referenced queue does not exist.
RPCError: For unexpected gRPC failures.
"""
proto_messages = [
service_pb2.EnqueueRequest(
queue=q,
headers=h or {},
payload=p,
)
for q, h, p in messages
]

try:
resp = await self._stub.BatchEnqueue(
service_pb2.BatchEnqueueRequest(messages=proto_messages)
)
except grpc.RpcError as e:
raise _map_batch_enqueue_error(e) from e

results: list[BatchEnqueueResult] = []
for r in resp.results:
if r.HasField("success"):
results.append(
BatchEnqueueResult(
message_id=str(r.success.message_id),
error=None,
)
)
else:
results.append(
BatchEnqueueResult(message_id=None, error=r.error)
)
return results

async def consume(self, queue: str) -> AsyncIterator[ConsumeMessage]:
"""Open a streaming consumer on the specified queue.

Expand Down Expand Up @@ -306,22 +363,25 @@ async def _consume_iter(
self,
stream: Any,
) -> AsyncIterator[ConsumeMessage]:
"""Internal async generator reading from the gRPC stream."""
"""Internal async generator reading from the gRPC stream.

Handles both singular ``message`` field (backward compatible) and
repeated ``messages`` field (batched delivery).
"""
try:
async for resp in stream:
# Check batched messages first (repeated field).
if len(resp.messages) > 0:
for msg in resp.messages:
if msg is not None and msg.ByteSize():
yield _proto_msg_to_consume_message(msg)
continue

# Fall back to singular message field.
msg = resp.message
if msg is None or not msg.ByteSize():
continue # keepalive
metadata = msg.metadata
cm = ConsumeMessage(
id=msg.id,
headers=dict(msg.headers),
payload=bytes(msg.payload),
fairness_key=metadata.fairness_key if metadata else "",
attempt_count=metadata.attempt_count if metadata else 0,
queue=metadata.queue_id if metadata else "",
)
yield cm
yield _proto_msg_to_consume_message(msg)
except grpc.RpcError:
return

Expand Down
Loading
Loading