diff --git a/fila/__init__.py b/fila/__init__.py index 732fc43..9117c96 100644 --- a/fila/__init__.py +++ b/fila/__init__.py @@ -3,21 +3,21 @@ from fila.async_client import AsyncClient from fila.client import Client from fila.errors import ( - BatchEnqueueError, + EnqueueError, FilaError, MessageNotFoundError, QueueNotFoundError, RPCError, ) -from fila.types import BatchEnqueueResult, BatchMode, ConsumeMessage, Linger +from fila.types import AccumulatorMode, ConsumeMessage, EnqueueResult, Linger __all__ = [ + "AccumulatorMode", "AsyncClient", - "BatchEnqueueError", - "BatchEnqueueResult", - "BatchMode", "Client", "ConsumeMessage", + "EnqueueError", + "EnqueueResult", "FilaError", "Linger", "MessageNotFoundError", diff --git a/fila/async_client.py b/fila/async_client.py index 8e06b1e..a1f2962 100644 --- a/fila/async_client.py +++ b/fila/async_client.py @@ -10,15 +10,16 @@ if TYPE_CHECKING: from collections.abc import AsyncIterator -from fila.client import _proto_msg_to_consume_message +from fila.client import _proto_enqueue_result_to_sdk, _proto_msg_to_consume_message from fila.errors import ( + EnqueueError, _map_ack_error, - _map_batch_enqueue_error, _map_consume_error, _map_enqueue_error, + _map_enqueue_result_error, _map_nack_error, ) -from fila.types import BatchEnqueueResult, ConsumeMessage +from fila.types import ConsumeMessage, EnqueueResult from fila.v1 import service_pb2, service_pb2_grpc @@ -125,7 +126,7 @@ def _extract_leader_hint(err: grpc.RpcError) -> str | None: class AsyncClient: """Asynchronous client for the Fila message broker. - Wraps the hot-path gRPC operations: enqueue, batch_enqueue, consume, ack, + Wraps the hot-path gRPC operations: enqueue, enqueue_many, consume, ack, nack. Usage:: @@ -255,26 +256,35 @@ async def enqueue( try: resp = await self._stub.Enqueue( service_pb2.EnqueueRequest( - queue=queue, - headers=headers or {}, - payload=payload, + messages=[ + service_pb2.EnqueueMessage( + queue=queue, + headers=headers or {}, + payload=payload, + ) + ] ) ) except grpc.RpcError as e: raise _map_enqueue_error(e) from e - return str(resp.message_id) - async def batch_enqueue( + result = resp.results[0] + which = result.WhichOneof("result") + if which == "message_id": + return str(result.message_id) + raise _map_enqueue_result_error(result.error.code, result.error.message) + + async def enqueue_many( self, messages: list[tuple[str, dict[str, str] | None, bytes]], - ) -> list[BatchEnqueueResult]: + ) -> list[EnqueueResult]: """Enqueue multiple messages in a single RPC. Args: messages: List of (queue, headers, payload) tuples. Returns: - List of ``BatchEnqueueResult`` objects, one per input message. + List of ``EnqueueResult`` objects, one per input message. Each result has either a ``message_id`` (success) or ``error`` (per-message failure). @@ -283,7 +293,7 @@ async def batch_enqueue( RPCError: For unexpected gRPC failures. """ proto_messages = [ - service_pb2.EnqueueRequest( + service_pb2.EnqueueMessage( queue=q, headers=h or {}, payload=p, @@ -292,26 +302,13 @@ async def batch_enqueue( ] try: - resp = await self._stub.BatchEnqueue( - service_pb2.BatchEnqueueRequest(messages=proto_messages) + resp = await self._stub.Enqueue( + service_pb2.EnqueueRequest(messages=proto_messages) ) except grpc.RpcError as e: - raise _map_batch_enqueue_error(e) from e - - results: list[BatchEnqueueResult] = [] - for r in resp.results: - if r.HasField("success"): - results.append( - BatchEnqueueResult( - message_id=str(r.success.message_id), - error=None, - ) - ) - else: - results.append( - BatchEnqueueResult(message_id=None, error=r.error) - ) - return results + raise _map_enqueue_error(e) from e + + return [_proto_enqueue_result_to_sdk(r) for r in resp.results] async def consume(self, queue: str) -> AsyncIterator[ConsumeMessage]: """Open a streaming consumer on the specified queue. @@ -363,25 +360,12 @@ async def _consume_iter( self, stream: Any, ) -> AsyncIterator[ConsumeMessage]: - """Internal async generator reading from the gRPC stream. - - Handles both singular ``message`` field (backward compatible) and - repeated ``messages`` field (batched delivery). - """ + """Internal async generator reading from the gRPC stream.""" try: async for resp in stream: - # Check batched messages first (repeated field). - if len(resp.messages) > 0: - for msg in resp.messages: - if msg is not None and msg.ByteSize(): - yield _proto_msg_to_consume_message(msg) - continue - - # Fall back to singular message field. - msg = resp.message - if msg is None or not msg.ByteSize(): - continue # keepalive - yield _proto_msg_to_consume_message(msg) + for msg in resp.messages: + if msg is not None and msg.ByteSize(): + yield _proto_msg_to_consume_message(msg) except grpc.RpcError: return @@ -399,12 +383,26 @@ async def ack(self, queue: str, msg_id: str) -> None: RPCError: For unexpected gRPC failures. """ try: - await self._stub.Ack( - service_pb2.AckRequest(queue=queue, message_id=msg_id) + resp = await self._stub.Ack( + service_pb2.AckRequest( + messages=[service_pb2.AckMessage(queue=queue, message_id=msg_id)] + ) ) except grpc.RpcError as e: raise _map_ack_error(e) from e + # Check per-message result for errors. + if resp.results: + result = resp.results[0] + which = result.WhichOneof("result") + if which == "error": + from fila.errors import MessageNotFoundError, RPCError as _RPCError + + ack_err = result.error + if ack_err.code == service_pb2.ACK_ERROR_CODE_MESSAGE_NOT_FOUND: + raise MessageNotFoundError(f"ack: {ack_err.message}") + raise _RPCError(grpc.StatusCode.INTERNAL, f"ack: {ack_err.message}") + async def nack(self, queue: str, msg_id: str, error: str) -> None: """Negatively acknowledge a message that failed processing. @@ -421,10 +419,26 @@ async def nack(self, queue: str, msg_id: str, error: str) -> None: RPCError: For unexpected gRPC failures. """ try: - await self._stub.Nack( + resp = await self._stub.Nack( service_pb2.NackRequest( - queue=queue, message_id=msg_id, error=error + messages=[ + service_pb2.NackMessage( + queue=queue, message_id=msg_id, error=error + ) + ] ) ) except grpc.RpcError as e: raise _map_nack_error(e) from e + + # Check per-message result for errors. + if resp.results: + result = resp.results[0] + which = result.WhichOneof("result") + if which == "error": + from fila.errors import MessageNotFoundError, RPCError as _RPCError + + nack_err = result.error + if nack_err.code == service_pb2.NACK_ERROR_CODE_MESSAGE_NOT_FOUND: + raise MessageNotFoundError(f"nack: {nack_err.message}") + raise _RPCError(grpc.StatusCode.INTERNAL, f"nack: {nack_err.message}") diff --git a/fila/batcher.py b/fila/batcher.py index c57964a..1bf2994 100644 --- a/fila/batcher.py +++ b/fila/batcher.py @@ -1,4 +1,4 @@ -"""Background batcher for opportunistic and linger-based enqueue batching.""" +"""Background accumulator for opportunistic and linger-based enqueue accumulation.""" from __future__ import annotations @@ -9,137 +9,131 @@ import grpc -from fila.errors import BatchEnqueueError, _map_enqueue_error -from fila.types import BatchEnqueueResult +from fila.errors import EnqueueError, _map_enqueue_error, _map_enqueue_result_error from fila.v1 import service_pb2 if TYPE_CHECKING: from fila.v1 import service_pb2_grpc -# Sentinel that signals the batcher thread to stop. +# Sentinel that signals the accumulator thread to stop. _STOP = object() -# Maximum batch size when none is configured. -_DEFAULT_MAX_BATCH_SIZE = 1000 +# Maximum number of messages per flush when none is configured. +_DEFAULT_MAX_MESSAGES = 1000 -class _EnqueueRequest: - """Internal envelope pairing a proto request with its result future.""" +class _EnqueueItem: + """Internal envelope pairing a proto EnqueueMessage with its result future.""" __slots__ = ("proto", "future") def __init__( self, - proto: service_pb2.EnqueueRequest, + proto: service_pb2.EnqueueMessage, future: Future[str], ) -> None: self.proto = proto self.future = future -def _msg_to_consume_result( - proto_result: Any, -) -> BatchEnqueueResult: - """Convert a proto ``BatchEnqueueResult`` to the SDK type.""" - if proto_result.HasField("success"): - return BatchEnqueueResult( - message_id=proto_result.success.message_id, - error=None, - ) - return BatchEnqueueResult( - message_id=None, - error=proto_result.error, - ) - - def _flush_single( stub: service_pb2_grpc.FilaServiceStub, - req: _EnqueueRequest, + req: _EnqueueItem, ) -> None: - """Send a single message via the singular Enqueue RPC. + """Send a single message via the unified Enqueue RPC. This preserves the specific error types (QueueNotFoundError, etc.) that callers of ``enqueue()`` expect. """ try: - resp = stub.Enqueue(req.proto) - req.future.set_result(str(resp.message_id)) + resp = stub.Enqueue( + service_pb2.EnqueueRequest(messages=[req.proto]) + ) + result = resp.results[0] + which = result.WhichOneof("result") + if which == "message_id": + req.future.set_result(str(result.message_id)) + else: + req.future.set_exception( + _map_enqueue_result_error(result.error.code, result.error.message) + ) except grpc.RpcError as e: req.future.set_exception(_map_enqueue_error(e)) except Exception as e: req.future.set_exception(e) -def _flush_batch( +def _flush_many( stub: service_pb2_grpc.FilaServiceStub, - batch: list[_EnqueueRequest], + items: list[_EnqueueItem], ) -> None: - """Send a batch of messages via the BatchEnqueue RPC. + """Send multiple messages via the unified Enqueue RPC. - On RPC-level failure, every future in the batch receives a - ``BatchEnqueueError``. On success, each future gets either its - message ID or a per-message error string wrapped in a - ``BatchEnqueueError``. + On RPC-level failure, every future in the batch receives an + ``EnqueueError``. On success, each future gets either its + message ID or a per-message error string wrapped in an + ``EnqueueError``. """ try: - resp = stub.BatchEnqueue( - service_pb2.BatchEnqueueRequest( - messages=[r.proto for r in batch], + resp = stub.Enqueue( + service_pb2.EnqueueRequest( + messages=[item.proto for item in items], ) ) except grpc.RpcError as e: - err = BatchEnqueueError(f"batch enqueue rpc failed: {e.details()}") - for r in batch: - r.future.set_exception(err) + err = EnqueueError(f"enqueue rpc failed: {e.details()}") + for item in items: + item.future.set_exception(err) return except Exception as e: - for r in batch: - r.future.set_exception(e) + for item in items: + item.future.set_exception(e) return # Pair each result with its request future. for i, result in enumerate(resp.results): - if i >= len(batch): + if i >= len(items): break - req = batch[i] - if result.HasField("success"): - req.future.set_result(str(result.success.message_id)) + item = items[i] + which = result.WhichOneof("result") + if which == "message_id": + item.future.set_result(str(result.message_id)) else: - req.future.set_exception( - BatchEnqueueError(f"enqueue failed: {result.error}") + item.future.set_exception( + _map_enqueue_result_error(result.error.code, result.error.message) ) -class AutoBatcher: - """Opportunistic batcher: drains a queue and flushes in batches. +class AutoAccumulator: + """Opportunistic accumulator: drains a queue and flushes in batches. A background daemon thread blocks on the first message, then non-blocking drains any additional messages that arrived during processing and flushes - them as a single batch via a thread pool executor. + them as a single Enqueue RPC via a thread pool executor. """ def __init__( self, stub: service_pb2_grpc.FilaServiceStub, - max_batch_size: int = _DEFAULT_MAX_BATCH_SIZE, + max_messages: int = _DEFAULT_MAX_MESSAGES, max_workers: int = 4, ) -> None: self._stub = stub - self._max_batch_size = max_batch_size - self._queue: queue.Queue[_EnqueueRequest | object] = queue.Queue() + self._max_messages = max_messages + self._queue: queue.Queue[_EnqueueItem | object] = queue.Queue() self._executor = ThreadPoolExecutor(max_workers=max_workers) self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() - def submit(self, proto: service_pb2.EnqueueRequest) -> Future[str]: - """Submit a message for batched enqueue. Returns a Future for the message ID.""" + def submit(self, proto: service_pb2.EnqueueMessage) -> Future[str]: + """Submit a message for accumulated enqueue. Returns a Future for the message ID.""" fut: Future[str] = Future() - self._queue.put(_EnqueueRequest(proto, fut)) + self._queue.put(_EnqueueItem(proto, fut)) return fut def close(self, timeout: float | None = 30.0) -> None: - """Drain pending messages and shut down the batcher. + """Drain pending messages and shut down the accumulator. Blocks until all pending messages have been flushed or *timeout* seconds have elapsed. @@ -160,11 +154,11 @@ def _run(self) -> None: if first is _STOP: return - assert isinstance(first, _EnqueueRequest) - batch: list[_EnqueueRequest] = [first] + assert isinstance(first, _EnqueueItem) + batch: list[_EnqueueItem] = [first] # Non-blocking drain of any additional queued messages. - while len(batch) < self._max_batch_size: + while len(batch) < self._max_messages: try: item = self._queue.get_nowait() except queue.Empty: @@ -173,25 +167,25 @@ def _run(self) -> None: # Flush what we have, then stop. self._flush(batch) return - assert isinstance(item, _EnqueueRequest) + assert isinstance(item, _EnqueueItem) batch.append(item) self._flush(batch) - def _flush(self, batch: list[_EnqueueRequest]) -> None: + def _flush(self, batch: list[_EnqueueItem]) -> None: """Dispatch a batch to the executor for concurrent RPC.""" if len(batch) == 1: - # Single-item optimization: use singular Enqueue RPC. + # Single-item optimization: still uses Enqueue but with one message. self._executor.submit(_flush_single, self._stub, batch[0]) else: - self._executor.submit(_flush_batch, self._stub, batch) + self._executor.submit(_flush_many, self._stub, batch) -class LingerBatcher: - """Timer-based batcher: holds messages for up to linger_ms or batch_size. +class LingerAccumulator: + """Timer-based accumulator: holds messages for up to linger_ms or max_messages. A background daemon thread accumulates messages and flushes when either - the batch reaches ``batch_size`` or ``linger_ms`` milliseconds have + the count reaches ``max_messages`` or ``linger_ms`` milliseconds have elapsed since the first message in the current batch arrived. """ @@ -199,25 +193,25 @@ def __init__( self, stub: service_pb2_grpc.FilaServiceStub, linger_ms: float, - batch_size: int, + max_messages: int, max_workers: int = 4, ) -> None: self._stub = stub self._linger_s = linger_ms / 1000.0 - self._batch_size = batch_size - self._queue: queue.Queue[_EnqueueRequest | object] = queue.Queue() + self._max_messages = max_messages + self._queue: queue.Queue[_EnqueueItem | object] = queue.Queue() self._executor = ThreadPoolExecutor(max_workers=max_workers) self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() - def submit(self, proto: service_pb2.EnqueueRequest) -> Future[str]: - """Submit a message for batched enqueue. Returns a Future for the message ID.""" + def submit(self, proto: service_pb2.EnqueueMessage) -> Future[str]: + """Submit a message for accumulated enqueue. Returns a Future for the message ID.""" fut: Future[str] = Future() - self._queue.put(_EnqueueRequest(proto, fut)) + self._queue.put(_EnqueueItem(proto, fut)) return fut def close(self, timeout: float | None = 30.0) -> None: - """Drain pending messages and shut down the batcher.""" + """Drain pending messages and shut down the accumulator.""" self._queue.put(_STOP) self._thread.join(timeout=timeout) self._executor.shutdown(wait=True) @@ -227,7 +221,7 @@ def update_stub(self, stub: service_pb2_grpc.FilaServiceStub) -> None: self._stub = stub def _run(self) -> None: - """Background loop: accumulate up to batch_size or linger timeout.""" + """Background loop: accumulate up to max_messages or linger timeout.""" import time while True: @@ -236,14 +230,14 @@ def _run(self) -> None: if first is _STOP: return - assert isinstance(first, _EnqueueRequest) - batch: list[_EnqueueRequest] = [first] + assert isinstance(first, _EnqueueItem) + batch: list[_EnqueueItem] = [first] # Track wall-clock deadline from when first message arrived. deadline = time.monotonic() + self._linger_s - # Accumulate more items until batch_size or linger timeout. - while len(batch) < self._batch_size: + # Accumulate more items until max_messages or linger timeout. + while len(batch) < self._max_messages: remaining = deadline - time.monotonic() if remaining <= 0: break @@ -254,14 +248,14 @@ def _run(self) -> None: if item is _STOP: self._flush(batch) return - assert isinstance(item, _EnqueueRequest) + assert isinstance(item, _EnqueueItem) batch.append(item) self._flush(batch) - def _flush(self, batch: list[_EnqueueRequest]) -> None: + def _flush(self, batch: list[_EnqueueItem]) -> None: """Dispatch a batch to the executor for concurrent RPC.""" if len(batch) == 1: self._executor.submit(_flush_single, self._stub, batch[0]) else: - self._executor.submit(_flush_batch, self._stub, batch) + self._executor.submit(_flush_many, self._stub, batch) diff --git a/fila/client.py b/fila/client.py index 0d7e49a..2f44bb6 100644 --- a/fila/client.py +++ b/fila/client.py @@ -6,15 +6,16 @@ import grpc -from fila.batcher import AutoBatcher, LingerBatcher +from fila.batcher import AutoAccumulator, LingerAccumulator from fila.errors import ( + EnqueueError, _map_ack_error, - _map_batch_enqueue_error, _map_consume_error, _map_enqueue_error, + _map_enqueue_result_error, _map_nack_error, ) -from fila.types import BatchEnqueueResult, BatchMode, ConsumeMessage, Linger +from fila.types import AccumulatorMode, ConsumeMessage, EnqueueResult, Linger from fila.v1 import service_pb2, service_pb2_grpc if TYPE_CHECKING: @@ -53,6 +54,14 @@ def _proto_msg_to_consume_message(msg: Any) -> ConsumeMessage: ) +def _proto_enqueue_result_to_sdk(result: Any) -> EnqueueResult: + """Convert a proto EnqueueResult to the SDK type.""" + which = result.WhichOneof("result") + if which == "message_id": + return EnqueueResult(message_id=str(result.message_id), error=None) + return EnqueueResult(message_id=None, error=result.error.message) + + class _ClientCallDetails( grpc.ClientCallDetails, # type: ignore[misc] ): @@ -122,7 +131,7 @@ def intercept_unary_stream( class Client: """Synchronous client for the Fila message broker. - Wraps the hot-path gRPC operations: enqueue, batch_enqueue, consume, ack, + Wraps the hot-path gRPC operations: enqueue, enqueue_many, consume, ack, nack. Usage:: @@ -138,16 +147,16 @@ class Client: with Client("localhost:5555") as client: client.enqueue("my-queue", None, b"hello") - Batch modes:: + Accumulator modes:: - # AUTO (default): opportunistic batching via background thread + # AUTO (default): opportunistic accumulation via background thread client = Client("localhost:5555") # DISABLED: each enqueue() is a direct RPC - client = Client("localhost:5555", batch_mode=BatchMode.DISABLED) + client = Client("localhost:5555", accumulator_mode=AccumulatorMode.DISABLED) - # LINGER: timer-based forced batching - client = Client("localhost:5555", batch_mode=Linger(linger_ms=10, batch_size=100)) + # LINGER: timer-based forced accumulation + client = Client("localhost:5555", accumulator_mode=Linger(linger_ms=10, max_messages=100)) TLS (system trust store):: @@ -179,8 +188,8 @@ def __init__( client_cert: bytes | None = None, client_key: bytes | None = None, api_key: str | None = None, - batch_mode: BatchMode | Linger = BatchMode.AUTO, - max_batch_size: int = 1000, + accumulator_mode: AccumulatorMode | Linger = AccumulatorMode.AUTO, + max_accumulator_messages: int = 1000, ) -> None: """Connect to a Fila broker at the given address. @@ -195,10 +204,12 @@ def __init__( client_key: PEM-encoded client private key for mutual TLS (optional). api_key: API key for authentication. When set, every RPC includes an ``authorization: Bearer `` metadata header. - batch_mode: Controls how ``enqueue()`` routes messages. Defaults to - ``BatchMode.AUTO`` (opportunistic batching). - max_batch_size: Maximum number of messages per batch when using - ``BatchMode.AUTO``. Defaults to 1000. + accumulator_mode: Controls how ``enqueue()`` routes messages. + Defaults to ``AccumulatorMode.AUTO`` + (opportunistic accumulation). + max_accumulator_messages: Maximum number of messages per flush when + using ``AccumulatorMode.AUTO``. + Defaults to 1000. """ self._tls = tls self._ca_cert = ca_cert @@ -215,20 +226,20 @@ def __init__( self._channel = self._make_channel(addr) self._stub = service_pb2_grpc.FilaServiceStub(self._channel) # type: ignore[no-untyped-call] - # Set up the batcher based on the chosen mode. - self._batcher: AutoBatcher | LingerBatcher | None = None - if isinstance(batch_mode, Linger): - self._batcher = LingerBatcher( + # Set up the accumulator based on the chosen mode. + self._accumulator: AutoAccumulator | LingerAccumulator | None = None + if isinstance(accumulator_mode, Linger): + self._accumulator = LingerAccumulator( self._stub, - linger_ms=batch_mode.linger_ms, - batch_size=batch_mode.batch_size, + linger_ms=accumulator_mode.linger_ms, + max_messages=accumulator_mode.max_messages, ) - elif batch_mode is BatchMode.AUTO: - self._batcher = AutoBatcher( + elif accumulator_mode is AccumulatorMode.AUTO: + self._accumulator = AutoAccumulator( self._stub, - max_batch_size=max_batch_size, + max_messages=max_accumulator_messages, ) - # BatchMode.DISABLED: self._batcher stays None + # AccumulatorMode.DISABLED: self._accumulator stays None def _make_channel(self, addr: str) -> grpc.Channel: """Create a gRPC channel to the given address using stored credentials.""" @@ -251,9 +262,9 @@ def _make_channel(self, addr: str) -> grpc.Channel: return channel def close(self) -> None: - """Drain pending batched messages and close the underlying gRPC channel.""" - if self._batcher is not None: - self._batcher.close() + """Drain pending accumulated messages and close the underlying gRPC channel.""" + if self._accumulator is not None: + self._accumulator.close() self._channel.close() def __enter__(self) -> Client: @@ -270,12 +281,12 @@ def enqueue( ) -> str: """Enqueue a message to the specified queue. - When a batcher is active (``BatchMode.AUTO`` or ``Linger``), the - message is submitted to the background batcher and this call blocks - until the batch is flushed and the result is available. + When an accumulator is active (``AccumulatorMode.AUTO`` or ``Linger``), + the message is submitted to the background accumulator and this call + blocks until the flush completes and the result is available. - When batching is disabled (``BatchMode.DISABLED``), this call makes - a direct synchronous RPC. + When accumulation is disabled (``AccumulatorMode.DISABLED``), this call + makes a direct synchronous RPC. Args: queue: Target queue name. @@ -287,40 +298,47 @@ def enqueue( Raises: QueueNotFoundError: If the queue does not exist (DISABLED mode). - BatchEnqueueError: If the batch RPC fails (AUTO/LINGER mode). + EnqueueError: If the enqueue RPC fails (AUTO/LINGER mode). RPCError: For unexpected gRPC failures. """ - proto = service_pb2.EnqueueRequest( + proto = service_pb2.EnqueueMessage( queue=queue, headers=headers or {}, payload=payload, ) - if self._batcher is not None: - future = self._batcher.submit(proto) + if self._accumulator is not None: + future = self._accumulator.submit(proto) return future.result() # Direct RPC (DISABLED mode). try: - resp = self._stub.Enqueue(proto) + resp = self._stub.Enqueue( + service_pb2.EnqueueRequest(messages=[proto]) + ) except grpc.RpcError as e: raise _map_enqueue_error(e) from e - return str(resp.message_id) - def batch_enqueue( + result = resp.results[0] + which = result.WhichOneof("result") + if which == "message_id": + return str(result.message_id) + raise _map_enqueue_result_error(result.error.code, result.error.message) + + def enqueue_many( self, messages: list[tuple[str, dict[str, str] | None, bytes]], - ) -> list[BatchEnqueueResult]: + ) -> list[EnqueueResult]: """Enqueue multiple messages in a single RPC. - This is an explicit batch operation that always uses the BatchEnqueue - RPC regardless of the batch_mode setting. + This is an explicit multi-message operation that always uses the + Enqueue RPC directly, regardless of the accumulator_mode setting. Args: messages: List of (queue, headers, payload) tuples. Returns: - List of ``BatchEnqueueResult`` objects, one per input message. + List of ``EnqueueResult`` objects, one per input message. Each result has either a ``message_id`` (success) or ``error`` (per-message failure). @@ -329,7 +347,7 @@ def batch_enqueue( RPCError: For unexpected gRPC failures. """ proto_messages = [ - service_pb2.EnqueueRequest( + service_pb2.EnqueueMessage( queue=q, headers=h or {}, payload=p, @@ -338,26 +356,13 @@ def batch_enqueue( ] try: - resp = self._stub.BatchEnqueue( - service_pb2.BatchEnqueueRequest(messages=proto_messages) + resp = self._stub.Enqueue( + service_pb2.EnqueueRequest(messages=proto_messages) ) except grpc.RpcError as e: - raise _map_batch_enqueue_error(e) from e - - results: list[BatchEnqueueResult] = [] - for r in resp.results: - if r.HasField("success"): - results.append( - BatchEnqueueResult( - message_id=str(r.success.message_id), - error=None, - ) - ) - else: - results.append( - BatchEnqueueResult(message_id=None, error=r.error) - ) - return results + raise _map_enqueue_error(e) from e + + return [_proto_enqueue_result_to_sdk(r) for r in resp.results] def consume(self, queue: str) -> Iterator[ConsumeMessage]: """Open a streaming consumer on the specified queue. @@ -398,8 +403,8 @@ def _reconnect_and_consume(self, leader_addr: str, queue: str) -> Any: self._channel.close() self._channel = self._make_channel(leader_addr) self._stub = service_pb2_grpc.FilaServiceStub(self._channel) # type: ignore[no-untyped-call] - if self._batcher is not None: - self._batcher.update_stub(self._stub) + if self._accumulator is not None: + self._accumulator.update_stub(self._stub) try: return self._stub.Consume( service_pb2.ConsumeRequest(queue=queue) @@ -411,25 +416,12 @@ def _consume_iter( self, stream: Any, ) -> Iterator[ConsumeMessage]: - """Internal generator reading from the gRPC stream. - - Handles both singular ``message`` field (backward compatible) and - repeated ``messages`` field (batched delivery). - """ + """Internal generator reading from the gRPC stream.""" try: for resp in stream: - # Check batched messages first (repeated field). - if len(resp.messages) > 0: - for msg in resp.messages: - if msg is not None and msg.ByteSize(): - yield _proto_msg_to_consume_message(msg) - continue - - # Fall back to singular message field. - msg = resp.message - if msg is None or not msg.ByteSize(): - continue # keepalive - yield _proto_msg_to_consume_message(msg) + for msg in resp.messages: + if msg is not None and msg.ByteSize(): + yield _proto_msg_to_consume_message(msg) except grpc.RpcError: return @@ -447,12 +439,26 @@ def ack(self, queue: str, msg_id: str) -> None: RPCError: For unexpected gRPC failures. """ try: - self._stub.Ack( - service_pb2.AckRequest(queue=queue, message_id=msg_id) + resp = self._stub.Ack( + service_pb2.AckRequest( + messages=[service_pb2.AckMessage(queue=queue, message_id=msg_id)] + ) ) except grpc.RpcError as e: raise _map_ack_error(e) from e + # Check per-message result for errors. + if resp.results: + result = resp.results[0] + which = result.WhichOneof("result") + if which == "error": + from fila.errors import MessageNotFoundError, RPCError as _RPCError + + ack_err = result.error + if ack_err.code == service_pb2.ACK_ERROR_CODE_MESSAGE_NOT_FOUND: + raise MessageNotFoundError(f"ack: {ack_err.message}") + raise _RPCError(grpc.StatusCode.INTERNAL, f"ack: {ack_err.message}") + def nack(self, queue: str, msg_id: str, error: str) -> None: """Negatively acknowledge a message that failed processing. @@ -469,10 +475,26 @@ def nack(self, queue: str, msg_id: str, error: str) -> None: RPCError: For unexpected gRPC failures. """ try: - self._stub.Nack( + resp = self._stub.Nack( service_pb2.NackRequest( - queue=queue, message_id=msg_id, error=error + messages=[ + service_pb2.NackMessage( + queue=queue, message_id=msg_id, error=error + ) + ] ) ) except grpc.RpcError as e: raise _map_nack_error(e) from e + + # Check per-message result for errors. + if resp.results: + result = resp.results[0] + which = result.WhichOneof("result") + if which == "error": + from fila.errors import MessageNotFoundError, RPCError as _RPCError + + nack_err = result.error + if nack_err.code == service_pb2.NACK_ERROR_CODE_MESSAGE_NOT_FOUND: + raise MessageNotFoundError(f"nack: {nack_err.message}") + raise _RPCError(grpc.StatusCode.INTERNAL, f"nack: {nack_err.message}") diff --git a/fila/errors.py b/fila/errors.py index 40e76ee..819a197 100644 --- a/fila/errors.py +++ b/fila/errors.py @@ -26,15 +26,31 @@ def __init__(self, code: grpc.StatusCode, message: str) -> None: super().__init__(f"rpc error (code = {code.name}): {message}") -class BatchEnqueueError(FilaError): - """Raised when a batched enqueue fails at the RPC level. +class EnqueueError(FilaError): + """Raised when an enqueue fails at the RPC level. - Individual per-message failures are reported via ``BatchEnqueueResult.error`` - and do not raise this exception. This is raised only when the entire batch + Individual per-message failures are reported via ``EnqueueResult.error`` + and do not raise this exception. This is raised only when the entire RPC fails (e.g., network error, server unavailable). """ +def _map_enqueue_result_error(code: int, message: str) -> FilaError: + """Map a per-message EnqueueErrorCode to a Fila exception. + + Used when the unified Enqueue RPC succeeds at the transport level but + returns a per-message error result (e.g., queue not found for one of + the messages in the batch). + """ + from fila.v1 import service_pb2 + + if code == service_pb2.ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND: + return QueueNotFoundError(f"enqueue: {message}") + if code == service_pb2.ENQUEUE_ERROR_CODE_PERMISSION_DENIED: + return RPCError(grpc.StatusCode.PERMISSION_DENIED, f"enqueue: {message}") + return EnqueueError(f"enqueue failed: {message}") + + def _map_enqueue_error(err: grpc.RpcError) -> FilaError: """Map a gRPC error from an enqueue call to a Fila exception.""" code = err.code() @@ -65,11 +81,3 @@ def _map_nack_error(err: grpc.RpcError) -> FilaError: if code == grpc.StatusCode.NOT_FOUND: return MessageNotFoundError(f"nack: {err.details()}") return RPCError(code, err.details() or "") - - -def _map_batch_enqueue_error(err: grpc.RpcError) -> FilaError: - """Map a gRPC error from a batch enqueue call to a Fila exception.""" - code = err.code() - if code == grpc.StatusCode.NOT_FOUND: - return QueueNotFoundError(f"batch_enqueue: {err.details()}") - return RPCError(code, err.details() or "") diff --git a/fila/types.py b/fila/types.py index 54ab034..a73c15a 100644 --- a/fila/types.py +++ b/fila/types.py @@ -19,8 +19,8 @@ class ConsumeMessage: @dataclass(frozen=True) -class BatchEnqueueResult: - """Result for a single message within a batch enqueue operation. +class EnqueueResult: + """Result for a single message within an enqueue operation. Exactly one of ``message_id`` or ``error`` is set. """ @@ -34,13 +34,13 @@ def is_success(self) -> bool: return self.message_id is not None -class BatchMode(Enum): +class AccumulatorMode(Enum): """Controls how ``enqueue()`` routes messages to the broker. - - ``AUTO``: Opportunistic batching via a background thread. At low load + - ``AUTO``: Opportunistic accumulation via a background thread. At low load messages are sent individually; at high load they cluster into batches. This is the default. - - ``DISABLED``: No batching. Each ``enqueue()`` call is a direct RPC. + - ``DISABLED``: No accumulation. Each ``enqueue()`` call is a direct RPC. """ AUTO = auto() @@ -49,15 +49,15 @@ class BatchMode(Enum): @dataclass(frozen=True) class Linger: - """Timer-based forced batching mode. + """Timer-based forced accumulation mode. Messages are held for up to ``linger_ms`` milliseconds or until - ``batch_size`` messages accumulate, whichever comes first. + ``max_messages`` messages accumulate, whichever comes first. Args: linger_ms: Maximum time to hold a message before flushing (milliseconds). - batch_size: Maximum number of messages per batch. + max_messages: Maximum number of messages per flush. """ linger_ms: float - batch_size: int + max_messages: int diff --git a/fila/v1/admin_pb2.py b/fila/v1/admin_pb2.py index 4bb4e27..7dc1f25 100644 --- a/fila/v1/admin_pb2.py +++ b/fila/v1/admin_pb2.py @@ -24,7 +24,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13\x66ila/v1/admin.proto\x12\x07\x66ila.v1\"H\n\x12\x43reateQueueRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12$\n\x06\x63onfig\x18\x02 \x01(\x0b\x32\x14.fila.v1.QueueConfig\"b\n\x0bQueueConfig\x12\x19\n\x11on_enqueue_script\x18\x01 \x01(\t\x12\x19\n\x11on_failure_script\x18\x02 \x01(\t\x12\x1d\n\x15visibility_timeout_ms\x18\x03 \x01(\x04\"\'\n\x13\x43reateQueueResponse\x12\x10\n\x08queue_id\x18\x01 \x01(\t\"#\n\x12\x44\x65leteQueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"\x15\n\x13\x44\x65leteQueueResponse\".\n\x10SetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"\x13\n\x11SetConfigResponse\"\x1f\n\x10GetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\"\"\n\x11GetConfigResponse\x12\r\n\x05value\x18\x01 \x01(\t\")\n\x0b\x43onfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"#\n\x11ListConfigRequest\x12\x0e\n\x06prefix\x18\x01 \x01(\t\"P\n\x12ListConfigResponse\x12%\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x14.fila.v1.ConfigEntry\x12\x13\n\x0btotal_count\x18\x02 \x01(\r\" \n\x0fGetStatsRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"b\n\x13PerFairnessKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x15\n\rpending_count\x18\x02 \x01(\x04\x12\x17\n\x0f\x63urrent_deficit\x18\x03 \x01(\x03\x12\x0e\n\x06weight\x18\x04 \x01(\r\"Z\n\x13PerThrottleKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x0e\n\x06tokens\x18\x02 \x01(\x01\x12\x17\n\x0frate_per_second\x18\x03 \x01(\x01\x12\r\n\x05\x62urst\x18\x04 \x01(\x01\"\x9f\x02\n\x10GetStatsResponse\x12\r\n\x05\x64\x65pth\x18\x01 \x01(\x04\x12\x11\n\tin_flight\x18\x02 \x01(\x04\x12\x1c\n\x14\x61\x63tive_fairness_keys\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x0f\n\x07quantum\x18\x05 \x01(\r\x12\x33\n\rper_key_stats\x18\x06 \x03(\x0b\x32\x1c.fila.v1.PerFairnessKeyStats\x12\x38\n\x12per_throttle_stats\x18\x07 \x03(\x0b\x32\x1c.fila.v1.PerThrottleKeyStats\x12\x16\n\x0eleader_node_id\x18\x08 \x01(\x04\x12\x19\n\x11replication_count\x18\t \x01(\r\"2\n\x0eRedriveRequest\x12\x11\n\tdlq_queue\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x04\"#\n\x0fRedriveResponse\x12\x10\n\x08redriven\x18\x01 \x01(\x04\"\x13\n\x11ListQueuesRequest\"m\n\tQueueInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x64\x65pth\x18\x02 \x01(\x04\x12\x11\n\tin_flight\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x16\n\x0eleader_node_id\x18\x05 \x01(\x04\"T\n\x12ListQueuesResponse\x12\"\n\x06queues\x18\x01 \x03(\x0b\x32\x12.fila.v1.QueueInfo\x12\x1a\n\x12\x63luster_node_count\x18\x02 \x01(\r\"Q\n\x13\x43reateApiKeyRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x15\n\rexpires_at_ms\x18\x02 \x01(\x04\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\"J\n\x14\x43reateApiKeyResponse\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\"%\n\x13RevokeApiKeyRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\"\x16\n\x14RevokeApiKeyResponse\"\x14\n\x12ListApiKeysRequest\"o\n\nApiKeyInfo\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\x15\n\rcreated_at_ms\x18\x03 \x01(\x04\x12\x15\n\rexpires_at_ms\x18\x04 \x01(\x04\x12\x15\n\ris_superadmin\x18\x05 \x01(\x08\"8\n\x13ListApiKeysResponse\x12!\n\x04keys\x18\x01 \x03(\x0b\x32\x13.fila.v1.ApiKeyInfo\".\n\rAclPermission\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0f\n\x07pattern\x18\x02 \x01(\t\"L\n\rSetAclRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12+\n\x0bpermissions\x18\x02 \x03(\x0b\x32\x16.fila.v1.AclPermission\"\x10\n\x0eSetAclResponse\"\x1f\n\rGetAclRequest\x12\x0e\n\x06key_id\x18\x01 \x01(\t\"d\n\x0eGetAclResponse\x12\x0e\n\x06key_id\x18\x01 \x01(\t\x12+\n\x0bpermissions\x18\x02 \x03(\x0b\x32\x16.fila.v1.AclPermission\x12\x15\n\ris_superadmin\x18\x03 \x01(\x08\x32\x8e\x07\n\tFilaAdmin\x12H\n\x0b\x43reateQueue\x12\x1b.fila.v1.CreateQueueRequest\x1a\x1c.fila.v1.CreateQueueResponse\x12H\n\x0b\x44\x65leteQueue\x12\x1b.fila.v1.DeleteQueueRequest\x1a\x1c.fila.v1.DeleteQueueResponse\x12\x42\n\tSetConfig\x12\x19.fila.v1.SetConfigRequest\x1a\x1a.fila.v1.SetConfigResponse\x12\x42\n\tGetConfig\x12\x19.fila.v1.GetConfigRequest\x1a\x1a.fila.v1.GetConfigResponse\x12\x45\n\nListConfig\x12\x1a.fila.v1.ListConfigRequest\x1a\x1b.fila.v1.ListConfigResponse\x12?\n\x08GetStats\x12\x18.fila.v1.GetStatsRequest\x1a\x19.fila.v1.GetStatsResponse\x12<\n\x07Redrive\x12\x17.fila.v1.RedriveRequest\x1a\x18.fila.v1.RedriveResponse\x12\x45\n\nListQueues\x12\x1a.fila.v1.ListQueuesRequest\x1a\x1b.fila.v1.ListQueuesResponse\x12K\n\x0c\x43reateApiKey\x12\x1c.fila.v1.CreateApiKeyRequest\x1a\x1d.fila.v1.CreateApiKeyResponse\x12K\n\x0cRevokeApiKey\x12\x1c.fila.v1.RevokeApiKeyRequest\x1a\x1d.fila.v1.RevokeApiKeyResponse\x12H\n\x0bListApiKeys\x12\x1b.fila.v1.ListApiKeysRequest\x1a\x1c.fila.v1.ListApiKeysResponse\x12\x39\n\x06SetAcl\x12\x16.fila.v1.SetAclRequest\x1a\x17.fila.v1.SetAclResponse\x12\x39\n\x06GetAcl\x12\x16.fila.v1.GetAclRequest\x1a\x17.fila.v1.GetAclResponseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x13\x66ila/v1/admin.proto\x12\x07\x66ila.v1\"H\n\x12\x43reateQueueRequest\x12\x0c\n\x04name\x18\x01 \x01(\t\x12$\n\x06\x63onfig\x18\x02 \x01(\x0b\x32\x14.fila.v1.QueueConfig\"b\n\x0bQueueConfig\x12\x19\n\x11on_enqueue_script\x18\x01 \x01(\t\x12\x19\n\x11on_failure_script\x18\x02 \x01(\t\x12\x1d\n\x15visibility_timeout_ms\x18\x03 \x01(\x04\"\'\n\x13\x43reateQueueResponse\x12\x10\n\x08queue_id\x18\x01 \x01(\t\"#\n\x12\x44\x65leteQueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"\x15\n\x13\x44\x65leteQueueResponse\".\n\x10SetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"\x13\n\x11SetConfigResponse\"\x1f\n\x10GetConfigRequest\x12\x0b\n\x03key\x18\x01 \x01(\t\"\"\n\x11GetConfigResponse\x12\r\n\x05value\x18\x01 \x01(\t\")\n\x0b\x43onfigEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"#\n\x11ListConfigRequest\x12\x0e\n\x06prefix\x18\x01 \x01(\t\"P\n\x12ListConfigResponse\x12%\n\x07\x65ntries\x18\x01 \x03(\x0b\x32\x14.fila.v1.ConfigEntry\x12\x13\n\x0btotal_count\x18\x02 \x01(\r\" \n\x0fGetStatsRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"b\n\x13PerFairnessKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x15\n\rpending_count\x18\x02 \x01(\x04\x12\x17\n\x0f\x63urrent_deficit\x18\x03 \x01(\x03\x12\x0e\n\x06weight\x18\x04 \x01(\r\"Z\n\x13PerThrottleKeyStats\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\x0e\n\x06tokens\x18\x02 \x01(\x01\x12\x17\n\x0frate_per_second\x18\x03 \x01(\x01\x12\r\n\x05\x62urst\x18\x04 \x01(\x01\"\x9f\x02\n\x10GetStatsResponse\x12\r\n\x05\x64\x65pth\x18\x01 \x01(\x04\x12\x11\n\tin_flight\x18\x02 \x01(\x04\x12\x1c\n\x14\x61\x63tive_fairness_keys\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x0f\n\x07quantum\x18\x05 \x01(\r\x12\x33\n\rper_key_stats\x18\x06 \x03(\x0b\x32\x1c.fila.v1.PerFairnessKeyStats\x12\x38\n\x12per_throttle_stats\x18\x07 \x03(\x0b\x32\x1c.fila.v1.PerThrottleKeyStats\x12\x16\n\x0eleader_node_id\x18\x08 \x01(\x04\x12\x19\n\x11replication_count\x18\t \x01(\r\"2\n\x0eRedriveRequest\x12\x11\n\tdlq_queue\x18\x01 \x01(\t\x12\r\n\x05\x63ount\x18\x02 \x01(\x04\"#\n\x0fRedriveResponse\x12\x10\n\x08redriven\x18\x01 \x01(\x04\"\x13\n\x11ListQueuesRequest\"m\n\tQueueInfo\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x64\x65pth\x18\x02 \x01(\x04\x12\x11\n\tin_flight\x18\x03 \x01(\x04\x12\x18\n\x10\x61\x63tive_consumers\x18\x04 \x01(\r\x12\x16\n\x0eleader_node_id\x18\x05 \x01(\x04\"T\n\x12ListQueuesResponse\x12\"\n\x06queues\x18\x01 \x03(\x0b\x32\x12.fila.v1.QueueInfo\x12\x1a\n\x12\x63luster_node_count\x18\x02 \x01(\r2\xb4\x04\n\tFilaAdmin\x12H\n\x0b\x43reateQueue\x12\x1b.fila.v1.CreateQueueRequest\x1a\x1c.fila.v1.CreateQueueResponse\x12H\n\x0b\x44\x65leteQueue\x12\x1b.fila.v1.DeleteQueueRequest\x1a\x1c.fila.v1.DeleteQueueResponse\x12\x42\n\tSetConfig\x12\x19.fila.v1.SetConfigRequest\x1a\x1a.fila.v1.SetConfigResponse\x12\x42\n\tGetConfig\x12\x19.fila.v1.GetConfigRequest\x1a\x1a.fila.v1.GetConfigResponse\x12\x45\n\nListConfig\x12\x1a.fila.v1.ListConfigRequest\x1a\x1b.fila.v1.ListConfigResponse\x12?\n\x08GetStats\x12\x18.fila.v1.GetStatsRequest\x1a\x19.fila.v1.GetStatsResponse\x12<\n\x07Redrive\x12\x17.fila.v1.RedriveRequest\x1a\x18.fila.v1.RedriveResponse\x12\x45\n\nListQueues\x12\x1a.fila.v1.ListQueuesRequest\x1a\x1b.fila.v1.ListQueuesResponseb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -73,30 +73,6 @@ _globals['_QUEUEINFO']._serialized_end=1342 _globals['_LISTQUEUESRESPONSE']._serialized_start=1344 _globals['_LISTQUEUESRESPONSE']._serialized_end=1428 - _globals['_CREATEAPIKEYREQUEST']._serialized_start=1430 - _globals['_CREATEAPIKEYREQUEST']._serialized_end=1511 - _globals['_CREATEAPIKEYRESPONSE']._serialized_start=1513 - _globals['_CREATEAPIKEYRESPONSE']._serialized_end=1587 - _globals['_REVOKEAPIKEYREQUEST']._serialized_start=1589 - _globals['_REVOKEAPIKEYREQUEST']._serialized_end=1626 - _globals['_REVOKEAPIKEYRESPONSE']._serialized_start=1628 - _globals['_REVOKEAPIKEYRESPONSE']._serialized_end=1650 - _globals['_LISTAPIKEYSREQUEST']._serialized_start=1652 - _globals['_LISTAPIKEYSREQUEST']._serialized_end=1672 - _globals['_APIKEYINFO']._serialized_start=1674 - _globals['_APIKEYINFO']._serialized_end=1785 - _globals['_LISTAPIKEYSRESPONSE']._serialized_start=1787 - _globals['_LISTAPIKEYSRESPONSE']._serialized_end=1843 - _globals['_ACLPERMISSION']._serialized_start=1845 - _globals['_ACLPERMISSION']._serialized_end=1891 - _globals['_SETACLREQUEST']._serialized_start=1893 - _globals['_SETACLREQUEST']._serialized_end=1969 - _globals['_SETACLRESPONSE']._serialized_start=1971 - _globals['_SETACLRESPONSE']._serialized_end=1987 - _globals['_GETACLREQUEST']._serialized_start=1989 - _globals['_GETACLREQUEST']._serialized_end=2020 - _globals['_GETACLRESPONSE']._serialized_start=2022 - _globals['_GETACLRESPONSE']._serialized_end=2122 - _globals['_FILAADMIN']._serialized_start=2125 - _globals['_FILAADMIN']._serialized_end=3035 + _globals['_FILAADMIN']._serialized_start=1431 + _globals['_FILAADMIN']._serialized_end=1995 # @@protoc_insertion_point(module_scope) diff --git a/fila/v1/admin_pb2.pyi b/fila/v1/admin_pb2.pyi index d603b29..0c594ce 100644 --- a/fila/v1/admin_pb2.pyi +++ b/fila/v1/admin_pb2.pyi @@ -177,93 +177,3 @@ class ListQueuesResponse(_message.Message): queues: _containers.RepeatedCompositeFieldContainer[QueueInfo] cluster_node_count: int def __init__(self, queues: _Optional[_Iterable[_Union[QueueInfo, _Mapping]]] = ..., cluster_node_count: _Optional[int] = ...) -> None: ... - -class CreateApiKeyRequest(_message.Message): - __slots__ = ("name", "expires_at_ms", "is_superadmin") - NAME_FIELD_NUMBER: _ClassVar[int] - EXPIRES_AT_MS_FIELD_NUMBER: _ClassVar[int] - IS_SUPERADMIN_FIELD_NUMBER: _ClassVar[int] - name: str - expires_at_ms: int - is_superadmin: bool - def __init__(self, name: _Optional[str] = ..., expires_at_ms: _Optional[int] = ..., is_superadmin: bool = ...) -> None: ... - -class CreateApiKeyResponse(_message.Message): - __slots__ = ("key_id", "key", "is_superadmin") - KEY_ID_FIELD_NUMBER: _ClassVar[int] - KEY_FIELD_NUMBER: _ClassVar[int] - IS_SUPERADMIN_FIELD_NUMBER: _ClassVar[int] - key_id: str - key: str - is_superadmin: bool - def __init__(self, key_id: _Optional[str] = ..., key: _Optional[str] = ..., is_superadmin: bool = ...) -> None: ... - -class RevokeApiKeyRequest(_message.Message): - __slots__ = ("key_id",) - KEY_ID_FIELD_NUMBER: _ClassVar[int] - key_id: str - def __init__(self, key_id: _Optional[str] = ...) -> None: ... - -class RevokeApiKeyResponse(_message.Message): - __slots__ = () - def __init__(self) -> None: ... - -class ListApiKeysRequest(_message.Message): - __slots__ = () - def __init__(self) -> None: ... - -class ApiKeyInfo(_message.Message): - __slots__ = ("key_id", "name", "created_at_ms", "expires_at_ms", "is_superadmin") - KEY_ID_FIELD_NUMBER: _ClassVar[int] - NAME_FIELD_NUMBER: _ClassVar[int] - CREATED_AT_MS_FIELD_NUMBER: _ClassVar[int] - EXPIRES_AT_MS_FIELD_NUMBER: _ClassVar[int] - IS_SUPERADMIN_FIELD_NUMBER: _ClassVar[int] - key_id: str - name: str - created_at_ms: int - expires_at_ms: int - is_superadmin: bool - def __init__(self, key_id: _Optional[str] = ..., name: _Optional[str] = ..., created_at_ms: _Optional[int] = ..., expires_at_ms: _Optional[int] = ..., is_superadmin: bool = ...) -> None: ... - -class ListApiKeysResponse(_message.Message): - __slots__ = ("keys",) - KEYS_FIELD_NUMBER: _ClassVar[int] - keys: _containers.RepeatedCompositeFieldContainer[ApiKeyInfo] - def __init__(self, keys: _Optional[_Iterable[_Union[ApiKeyInfo, _Mapping]]] = ...) -> None: ... - -class AclPermission(_message.Message): - __slots__ = ("kind", "pattern") - KIND_FIELD_NUMBER: _ClassVar[int] - PATTERN_FIELD_NUMBER: _ClassVar[int] - kind: str - pattern: str - def __init__(self, kind: _Optional[str] = ..., pattern: _Optional[str] = ...) -> None: ... - -class SetAclRequest(_message.Message): - __slots__ = ("key_id", "permissions") - KEY_ID_FIELD_NUMBER: _ClassVar[int] - PERMISSIONS_FIELD_NUMBER: _ClassVar[int] - key_id: str - permissions: _containers.RepeatedCompositeFieldContainer[AclPermission] - def __init__(self, key_id: _Optional[str] = ..., permissions: _Optional[_Iterable[_Union[AclPermission, _Mapping]]] = ...) -> None: ... - -class SetAclResponse(_message.Message): - __slots__ = () - def __init__(self) -> None: ... - -class GetAclRequest(_message.Message): - __slots__ = ("key_id",) - KEY_ID_FIELD_NUMBER: _ClassVar[int] - key_id: str - def __init__(self, key_id: _Optional[str] = ...) -> None: ... - -class GetAclResponse(_message.Message): - __slots__ = ("key_id", "permissions", "is_superadmin") - KEY_ID_FIELD_NUMBER: _ClassVar[int] - PERMISSIONS_FIELD_NUMBER: _ClassVar[int] - IS_SUPERADMIN_FIELD_NUMBER: _ClassVar[int] - key_id: str - permissions: _containers.RepeatedCompositeFieldContainer[AclPermission] - is_superadmin: bool - def __init__(self, key_id: _Optional[str] = ..., permissions: _Optional[_Iterable[_Union[AclPermission, _Mapping]]] = ..., is_superadmin: bool = ...) -> None: ... diff --git a/fila/v1/admin_pb2_grpc.py b/fila/v1/admin_pb2_grpc.py index 93d6c4e..70d8fbe 100644 --- a/fila/v1/admin_pb2_grpc.py +++ b/fila/v1/admin_pb2_grpc.py @@ -5,7 +5,7 @@ from fila.v1 import admin_pb2 as fila_dot_v1_dot_admin__pb2 -GRPC_GENERATED_VERSION = '1.78.0' +GRPC_GENERATED_VERSION = '1.78.1' GRPC_VERSION = grpc.__version__ _version_not_supported = False @@ -75,31 +75,6 @@ def __init__(self, channel): request_serializer=fila_dot_v1_dot_admin__pb2.ListQueuesRequest.SerializeToString, response_deserializer=fila_dot_v1_dot_admin__pb2.ListQueuesResponse.FromString, _registered_method=True) - self.CreateApiKey = channel.unary_unary( - '/fila.v1.FilaAdmin/CreateApiKey', - request_serializer=fila_dot_v1_dot_admin__pb2.CreateApiKeyRequest.SerializeToString, - response_deserializer=fila_dot_v1_dot_admin__pb2.CreateApiKeyResponse.FromString, - _registered_method=True) - self.RevokeApiKey = channel.unary_unary( - '/fila.v1.FilaAdmin/RevokeApiKey', - request_serializer=fila_dot_v1_dot_admin__pb2.RevokeApiKeyRequest.SerializeToString, - response_deserializer=fila_dot_v1_dot_admin__pb2.RevokeApiKeyResponse.FromString, - _registered_method=True) - self.ListApiKeys = channel.unary_unary( - '/fila.v1.FilaAdmin/ListApiKeys', - request_serializer=fila_dot_v1_dot_admin__pb2.ListApiKeysRequest.SerializeToString, - response_deserializer=fila_dot_v1_dot_admin__pb2.ListApiKeysResponse.FromString, - _registered_method=True) - self.SetAcl = channel.unary_unary( - '/fila.v1.FilaAdmin/SetAcl', - request_serializer=fila_dot_v1_dot_admin__pb2.SetAclRequest.SerializeToString, - response_deserializer=fila_dot_v1_dot_admin__pb2.SetAclResponse.FromString, - _registered_method=True) - self.GetAcl = channel.unary_unary( - '/fila.v1.FilaAdmin/GetAcl', - request_serializer=fila_dot_v1_dot_admin__pb2.GetAclRequest.SerializeToString, - response_deserializer=fila_dot_v1_dot_admin__pb2.GetAclResponse.FromString, - _registered_method=True) class FilaAdminServicer(object): @@ -154,38 +129,6 @@ def ListQueues(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def CreateApiKey(self, request, context): - """API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def RevokeApiKey(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def ListApiKeys(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def SetAcl(self, request, context): - """Per-key ACL management. - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def GetAcl(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - def add_FilaAdminServicer_to_server(servicer, server): rpc_method_handlers = { @@ -229,31 +172,6 @@ def add_FilaAdminServicer_to_server(servicer, server): request_deserializer=fila_dot_v1_dot_admin__pb2.ListQueuesRequest.FromString, response_serializer=fila_dot_v1_dot_admin__pb2.ListQueuesResponse.SerializeToString, ), - 'CreateApiKey': grpc.unary_unary_rpc_method_handler( - servicer.CreateApiKey, - request_deserializer=fila_dot_v1_dot_admin__pb2.CreateApiKeyRequest.FromString, - response_serializer=fila_dot_v1_dot_admin__pb2.CreateApiKeyResponse.SerializeToString, - ), - 'RevokeApiKey': grpc.unary_unary_rpc_method_handler( - servicer.RevokeApiKey, - request_deserializer=fila_dot_v1_dot_admin__pb2.RevokeApiKeyRequest.FromString, - response_serializer=fila_dot_v1_dot_admin__pb2.RevokeApiKeyResponse.SerializeToString, - ), - 'ListApiKeys': grpc.unary_unary_rpc_method_handler( - servicer.ListApiKeys, - request_deserializer=fila_dot_v1_dot_admin__pb2.ListApiKeysRequest.FromString, - response_serializer=fila_dot_v1_dot_admin__pb2.ListApiKeysResponse.SerializeToString, - ), - 'SetAcl': grpc.unary_unary_rpc_method_handler( - servicer.SetAcl, - request_deserializer=fila_dot_v1_dot_admin__pb2.SetAclRequest.FromString, - response_serializer=fila_dot_v1_dot_admin__pb2.SetAclResponse.SerializeToString, - ), - 'GetAcl': grpc.unary_unary_rpc_method_handler( - servicer.GetAcl, - request_deserializer=fila_dot_v1_dot_admin__pb2.GetAclRequest.FromString, - response_serializer=fila_dot_v1_dot_admin__pb2.GetAclResponse.SerializeToString, - ), } generic_handler = grpc.method_handlers_generic_handler( 'fila.v1.FilaAdmin', rpc_method_handlers) @@ -481,138 +399,3 @@ def ListQueues(request, timeout, metadata, _registered_method=True) - - @staticmethod - def CreateApiKey(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/fila.v1.FilaAdmin/CreateApiKey', - fila_dot_v1_dot_admin__pb2.CreateApiKeyRequest.SerializeToString, - fila_dot_v1_dot_admin__pb2.CreateApiKeyResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) - - @staticmethod - def RevokeApiKey(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/fila.v1.FilaAdmin/RevokeApiKey', - fila_dot_v1_dot_admin__pb2.RevokeApiKeyRequest.SerializeToString, - fila_dot_v1_dot_admin__pb2.RevokeApiKeyResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) - - @staticmethod - def ListApiKeys(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/fila.v1.FilaAdmin/ListApiKeys', - fila_dot_v1_dot_admin__pb2.ListApiKeysRequest.SerializeToString, - fila_dot_v1_dot_admin__pb2.ListApiKeysResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) - - @staticmethod - def SetAcl(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/fila.v1.FilaAdmin/SetAcl', - fila_dot_v1_dot_admin__pb2.SetAclRequest.SerializeToString, - fila_dot_v1_dot_admin__pb2.SetAclResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) - - @staticmethod - def GetAcl(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/fila.v1.FilaAdmin/GetAcl', - fila_dot_v1_dot_admin__pb2.GetAclRequest.SerializeToString, - fila_dot_v1_dot_admin__pb2.GetAclResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) diff --git a/fila/v1/service_pb2.py b/fila/v1/service_pb2.py index 7f04078..7489260 100644 --- a/fila/v1/service_pb2.py +++ b/fila/v1/service_pb2.py @@ -25,39 +25,65 @@ from fila.v1 import messages_pb2 as fila_dot_v1_dot_messages__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15\x66ila/v1/service.proto\x12\x07\x66ila.v1\x1a\x16\x66ila/v1/messages.proto\"\x97\x01\n\x0e\x45nqueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x35\n\x07headers\x18\x02 \x03(\x0b\x32$.fila.v1.EnqueueRequest.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"%\n\x0f\x45nqueueResponse\x12\x12\n\nmessage_id\x18\x01 \x01(\t\"\x1f\n\x0e\x43onsumeRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"X\n\x0f\x43onsumeResponse\x12!\n\x07message\x18\x01 \x01(\x0b\x32\x10.fila.v1.Message\x12\"\n\x08messages\x18\x02 \x03(\x0b\x32\x10.fila.v1.Message\"/\n\nAckRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\"\r\n\x0b\x41\x63kResponse\"?\n\x0bNackRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"\x0e\n\x0cNackResponse\"@\n\x13\x42\x61tchEnqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueRequest\"D\n\x14\x42\x61tchEnqueueResponse\x12,\n\x07results\x18\x01 \x03(\x0b\x32\x1b.fila.v1.BatchEnqueueResult\"\\\n\x12\x42\x61tchEnqueueResult\x12+\n\x07success\x18\x01 \x01(\x0b\x32\x18.fila.v1.EnqueueResponseH\x00\x12\x0f\n\x05\x65rror\x18\x02 \x01(\tH\x00\x42\x08\n\x06result2\xbf\x02\n\x0b\x46ilaService\x12<\n\x07\x45nqueue\x12\x17.fila.v1.EnqueueRequest\x1a\x18.fila.v1.EnqueueResponse\x12K\n\x0c\x42\x61tchEnqueue\x12\x1c.fila.v1.BatchEnqueueRequest\x1a\x1d.fila.v1.BatchEnqueueResponse\x12>\n\x07\x43onsume\x12\x17.fila.v1.ConsumeRequest\x1a\x18.fila.v1.ConsumeResponse0\x01\x12\x30\n\x03\x41\x63k\x12\x13.fila.v1.AckRequest\x1a\x14.fila.v1.AckResponse\x12\x33\n\x04Nack\x12\x14.fila.v1.NackRequest\x1a\x15.fila.v1.NackResponseb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15\x66ila/v1/service.proto\x12\x07\x66ila.v1\x1a\x16\x66ila/v1/messages.proto\"\x97\x01\n\x0e\x45nqueueMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x35\n\x07headers\x18\x02 \x03(\x0b\x32$.fila.v1.EnqueueMessage.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\";\n\x0e\x45nqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueMessage\"W\n\rEnqueueResult\x12\x14\n\nmessage_id\x18\x01 \x01(\tH\x00\x12&\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x15.fila.v1.EnqueueErrorH\x00\x42\x08\n\x06result\"H\n\x0c\x45nqueueError\x12\'\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x19.fila.v1.EnqueueErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\":\n\x0f\x45nqueueResponse\x12\'\n\x07results\x18\x01 \x03(\x0b\x32\x16.fila.v1.EnqueueResult\"\x1f\n\x0e\x43onsumeRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"5\n\x0f\x43onsumeResponse\x12\"\n\x08messages\x18\x01 \x03(\x0b\x32\x10.fila.v1.Message\"/\n\nAckMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\"3\n\nAckRequest\x12%\n\x08messages\x18\x01 \x03(\x0b\x32\x13.fila.v1.AckMessage\"a\n\tAckResult\x12&\n\x07success\x18\x01 \x01(\x0b\x32\x13.fila.v1.AckSuccessH\x00\x12\"\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x11.fila.v1.AckErrorH\x00\x42\x08\n\x06result\"\x0c\n\nAckSuccess\"@\n\x08\x41\x63kError\x12#\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x15.fila.v1.AckErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"2\n\x0b\x41\x63kResponse\x12#\n\x07results\x18\x01 \x03(\x0b\x32\x12.fila.v1.AckResult\"?\n\x0bNackMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"5\n\x0bNackRequest\x12&\n\x08messages\x18\x01 \x03(\x0b\x32\x14.fila.v1.NackMessage\"d\n\nNackResult\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x14.fila.v1.NackSuccessH\x00\x12#\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x12.fila.v1.NackErrorH\x00\x42\x08\n\x06result\"\r\n\x0bNackSuccess\"B\n\tNackError\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.fila.v1.NackErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"4\n\x0cNackResponse\x12$\n\x07results\x18\x01 \x03(\x0b\x32\x13.fila.v1.NackResult\"Z\n\x14StreamEnqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueMessage\x12\x17\n\x0fsequence_number\x18\x02 \x01(\x04\"Y\n\x15StreamEnqueueResponse\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x04\x12\'\n\x07results\x18\x02 \x03(\x0b\x32\x16.fila.v1.EnqueueResult*\xc4\x01\n\x10\x45nqueueErrorCode\x12\"\n\x1e\x45NQUEUE_ERROR_CODE_UNSPECIFIED\x10\x00\x12&\n\"ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND\x10\x01\x12\x1e\n\x1a\x45NQUEUE_ERROR_CODE_STORAGE\x10\x02\x12\x1a\n\x16\x45NQUEUE_ERROR_CODE_LUA\x10\x03\x12(\n$ENQUEUE_ERROR_CODE_PERMISSION_DENIED\x10\x04*\x96\x01\n\x0c\x41\x63kErrorCode\x12\x1e\n\x1a\x41\x43K_ERROR_CODE_UNSPECIFIED\x10\x00\x12$\n ACK_ERROR_CODE_MESSAGE_NOT_FOUND\x10\x01\x12\x1a\n\x16\x41\x43K_ERROR_CODE_STORAGE\x10\x02\x12$\n ACK_ERROR_CODE_PERMISSION_DENIED\x10\x03*\x9b\x01\n\rNackErrorCode\x12\x1f\n\x1bNACK_ERROR_CODE_UNSPECIFIED\x10\x00\x12%\n!NACK_ERROR_CODE_MESSAGE_NOT_FOUND\x10\x01\x12\x1b\n\x17NACK_ERROR_CODE_STORAGE\x10\x02\x12%\n!NACK_ERROR_CODE_PERMISSION_DENIED\x10\x03\x32\xc6\x02\n\x0b\x46ilaService\x12<\n\x07\x45nqueue\x12\x17.fila.v1.EnqueueRequest\x1a\x18.fila.v1.EnqueueResponse\x12R\n\rStreamEnqueue\x12\x1d.fila.v1.StreamEnqueueRequest\x1a\x1e.fila.v1.StreamEnqueueResponse(\x01\x30\x01\x12>\n\x07\x43onsume\x12\x17.fila.v1.ConsumeRequest\x1a\x18.fila.v1.ConsumeResponse0\x01\x12\x30\n\x03\x41\x63k\x12\x13.fila.v1.AckRequest\x1a\x14.fila.v1.AckResponse\x12\x33\n\x04Nack\x12\x14.fila.v1.NackRequest\x1a\x15.fila.v1.NackResponseb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'fila.v1.service_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None - _globals['_ENQUEUEREQUEST_HEADERSENTRY']._loaded_options = None - _globals['_ENQUEUEREQUEST_HEADERSENTRY']._serialized_options = b'8\001' - _globals['_ENQUEUEREQUEST']._serialized_start=59 - _globals['_ENQUEUEREQUEST']._serialized_end=210 - _globals['_ENQUEUEREQUEST_HEADERSENTRY']._serialized_start=164 - _globals['_ENQUEUEREQUEST_HEADERSENTRY']._serialized_end=210 - _globals['_ENQUEUERESPONSE']._serialized_start=212 - _globals['_ENQUEUERESPONSE']._serialized_end=249 - _globals['_CONSUMEREQUEST']._serialized_start=251 - _globals['_CONSUMEREQUEST']._serialized_end=282 - _globals['_CONSUMERESPONSE']._serialized_start=284 - _globals['_CONSUMERESPONSE']._serialized_end=372 - _globals['_ACKREQUEST']._serialized_start=374 - _globals['_ACKREQUEST']._serialized_end=421 - _globals['_ACKRESPONSE']._serialized_start=423 - _globals['_ACKRESPONSE']._serialized_end=436 - _globals['_NACKREQUEST']._serialized_start=438 - _globals['_NACKREQUEST']._serialized_end=501 - _globals['_NACKRESPONSE']._serialized_start=503 - _globals['_NACKRESPONSE']._serialized_end=517 - _globals['_BATCHENQUEUEREQUEST']._serialized_start=519 - _globals['_BATCHENQUEUEREQUEST']._serialized_end=583 - _globals['_BATCHENQUEUERESPONSE']._serialized_start=585 - _globals['_BATCHENQUEUERESPONSE']._serialized_end=653 - _globals['_BATCHENQUEUERESULT']._serialized_start=655 - _globals['_BATCHENQUEUERESULT']._serialized_end=747 - _globals['_FILASERVICE']._serialized_start=750 - _globals['_FILASERVICE']._serialized_end=1069 + _globals['_ENQUEUEMESSAGE_HEADERSENTRY']._loaded_options = None + _globals['_ENQUEUEMESSAGE_HEADERSENTRY']._serialized_options = b'8\001' + _globals['_ENQUEUEERRORCODE']._serialized_start=1460 + _globals['_ENQUEUEERRORCODE']._serialized_end=1656 + _globals['_ACKERRORCODE']._serialized_start=1659 + _globals['_ACKERRORCODE']._serialized_end=1809 + _globals['_NACKERRORCODE']._serialized_start=1812 + _globals['_NACKERRORCODE']._serialized_end=1967 + _globals['_ENQUEUEMESSAGE']._serialized_start=59 + _globals['_ENQUEUEMESSAGE']._serialized_end=210 + _globals['_ENQUEUEMESSAGE_HEADERSENTRY']._serialized_start=164 + _globals['_ENQUEUEMESSAGE_HEADERSENTRY']._serialized_end=210 + _globals['_ENQUEUEREQUEST']._serialized_start=212 + _globals['_ENQUEUEREQUEST']._serialized_end=271 + _globals['_ENQUEUERESULT']._serialized_start=273 + _globals['_ENQUEUERESULT']._serialized_end=360 + _globals['_ENQUEUEERROR']._serialized_start=362 + _globals['_ENQUEUEERROR']._serialized_end=434 + _globals['_ENQUEUERESPONSE']._serialized_start=436 + _globals['_ENQUEUERESPONSE']._serialized_end=494 + _globals['_CONSUMEREQUEST']._serialized_start=496 + _globals['_CONSUMEREQUEST']._serialized_end=527 + _globals['_CONSUMERESPONSE']._serialized_start=529 + _globals['_CONSUMERESPONSE']._serialized_end=582 + _globals['_ACKMESSAGE']._serialized_start=584 + _globals['_ACKMESSAGE']._serialized_end=631 + _globals['_ACKREQUEST']._serialized_start=633 + _globals['_ACKREQUEST']._serialized_end=684 + _globals['_ACKRESULT']._serialized_start=686 + _globals['_ACKRESULT']._serialized_end=783 + _globals['_ACKSUCCESS']._serialized_start=785 + _globals['_ACKSUCCESS']._serialized_end=797 + _globals['_ACKERROR']._serialized_start=799 + _globals['_ACKERROR']._serialized_end=863 + _globals['_ACKRESPONSE']._serialized_start=865 + _globals['_ACKRESPONSE']._serialized_end=915 + _globals['_NACKMESSAGE']._serialized_start=917 + _globals['_NACKMESSAGE']._serialized_end=980 + _globals['_NACKREQUEST']._serialized_start=982 + _globals['_NACKREQUEST']._serialized_end=1035 + _globals['_NACKRESULT']._serialized_start=1037 + _globals['_NACKRESULT']._serialized_end=1137 + _globals['_NACKSUCCESS']._serialized_start=1139 + _globals['_NACKSUCCESS']._serialized_end=1152 + _globals['_NACKERROR']._serialized_start=1154 + _globals['_NACKERROR']._serialized_end=1220 + _globals['_NACKRESPONSE']._serialized_start=1222 + _globals['_NACKRESPONSE']._serialized_end=1274 + _globals['_STREAMENQUEUEREQUEST']._serialized_start=1276 + _globals['_STREAMENQUEUEREQUEST']._serialized_end=1366 + _globals['_STREAMENQUEUERESPONSE']._serialized_start=1368 + _globals['_STREAMENQUEUERESPONSE']._serialized_end=1457 + _globals['_FILASERVICE']._serialized_start=1970 + _globals['_FILASERVICE']._serialized_end=2296 # @@protoc_insertion_point(module_scope) diff --git a/fila/v1/service_pb2.pyi b/fila/v1/service_pb2.pyi index ca1e820..a840197 100644 --- a/fila/v1/service_pb2.pyi +++ b/fila/v1/service_pb2.pyi @@ -1,5 +1,6 @@ from fila.v1 import messages_pb2 as _messages_pb2 from google.protobuf.internal import containers as _containers +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper from google.protobuf import descriptor as _descriptor from google.protobuf import message as _message from collections.abc import Iterable as _Iterable, Mapping as _Mapping @@ -7,7 +8,42 @@ from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union DESCRIPTOR: _descriptor.FileDescriptor -class EnqueueRequest(_message.Message): +class EnqueueErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + ENQUEUE_ERROR_CODE_UNSPECIFIED: _ClassVar[EnqueueErrorCode] + ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND: _ClassVar[EnqueueErrorCode] + ENQUEUE_ERROR_CODE_STORAGE: _ClassVar[EnqueueErrorCode] + ENQUEUE_ERROR_CODE_LUA: _ClassVar[EnqueueErrorCode] + ENQUEUE_ERROR_CODE_PERMISSION_DENIED: _ClassVar[EnqueueErrorCode] + +class AckErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + ACK_ERROR_CODE_UNSPECIFIED: _ClassVar[AckErrorCode] + ACK_ERROR_CODE_MESSAGE_NOT_FOUND: _ClassVar[AckErrorCode] + ACK_ERROR_CODE_STORAGE: _ClassVar[AckErrorCode] + ACK_ERROR_CODE_PERMISSION_DENIED: _ClassVar[AckErrorCode] + +class NackErrorCode(int, metaclass=_enum_type_wrapper.EnumTypeWrapper): + __slots__ = () + NACK_ERROR_CODE_UNSPECIFIED: _ClassVar[NackErrorCode] + NACK_ERROR_CODE_MESSAGE_NOT_FOUND: _ClassVar[NackErrorCode] + NACK_ERROR_CODE_STORAGE: _ClassVar[NackErrorCode] + NACK_ERROR_CODE_PERMISSION_DENIED: _ClassVar[NackErrorCode] +ENQUEUE_ERROR_CODE_UNSPECIFIED: EnqueueErrorCode +ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND: EnqueueErrorCode +ENQUEUE_ERROR_CODE_STORAGE: EnqueueErrorCode +ENQUEUE_ERROR_CODE_LUA: EnqueueErrorCode +ENQUEUE_ERROR_CODE_PERMISSION_DENIED: EnqueueErrorCode +ACK_ERROR_CODE_UNSPECIFIED: AckErrorCode +ACK_ERROR_CODE_MESSAGE_NOT_FOUND: AckErrorCode +ACK_ERROR_CODE_STORAGE: AckErrorCode +ACK_ERROR_CODE_PERMISSION_DENIED: AckErrorCode +NACK_ERROR_CODE_UNSPECIFIED: NackErrorCode +NACK_ERROR_CODE_MESSAGE_NOT_FOUND: NackErrorCode +NACK_ERROR_CODE_STORAGE: NackErrorCode +NACK_ERROR_CODE_PERMISSION_DENIED: NackErrorCode + +class EnqueueMessage(_message.Message): __slots__ = ("queue", "headers", "payload") class HeadersEntry(_message.Message): __slots__ = ("key", "value") @@ -24,11 +60,33 @@ class EnqueueRequest(_message.Message): payload: bytes def __init__(self, queue: _Optional[str] = ..., headers: _Optional[_Mapping[str, str]] = ..., payload: _Optional[bytes] = ...) -> None: ... -class EnqueueResponse(_message.Message): - __slots__ = ("message_id",) +class EnqueueRequest(_message.Message): + __slots__ = ("messages",) + MESSAGES_FIELD_NUMBER: _ClassVar[int] + messages: _containers.RepeatedCompositeFieldContainer[EnqueueMessage] + def __init__(self, messages: _Optional[_Iterable[_Union[EnqueueMessage, _Mapping]]] = ...) -> None: ... + +class EnqueueResult(_message.Message): + __slots__ = ("message_id", "error") MESSAGE_ID_FIELD_NUMBER: _ClassVar[int] + ERROR_FIELD_NUMBER: _ClassVar[int] message_id: str - def __init__(self, message_id: _Optional[str] = ...) -> None: ... + error: EnqueueError + def __init__(self, message_id: _Optional[str] = ..., error: _Optional[_Union[EnqueueError, _Mapping]] = ...) -> None: ... + +class EnqueueError(_message.Message): + __slots__ = ("code", "message") + CODE_FIELD_NUMBER: _ClassVar[int] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + code: EnqueueErrorCode + message: str + def __init__(self, code: _Optional[_Union[EnqueueErrorCode, str]] = ..., message: _Optional[str] = ...) -> None: ... + +class EnqueueResponse(_message.Message): + __slots__ = ("results",) + RESULTS_FIELD_NUMBER: _ClassVar[int] + results: _containers.RepeatedCompositeFieldContainer[EnqueueResult] + def __init__(self, results: _Optional[_Iterable[_Union[EnqueueResult, _Mapping]]] = ...) -> None: ... class ConsumeRequest(_message.Message): __slots__ = ("queue",) @@ -37,14 +95,12 @@ class ConsumeRequest(_message.Message): def __init__(self, queue: _Optional[str] = ...) -> None: ... class ConsumeResponse(_message.Message): - __slots__ = ("message", "messages") - MESSAGE_FIELD_NUMBER: _ClassVar[int] + __slots__ = ("messages",) MESSAGES_FIELD_NUMBER: _ClassVar[int] - message: _messages_pb2.Message messages: _containers.RepeatedCompositeFieldContainer[_messages_pb2.Message] - def __init__(self, message: _Optional[_Union[_messages_pb2.Message, _Mapping]] = ..., messages: _Optional[_Iterable[_Union[_messages_pb2.Message, _Mapping]]] = ...) -> None: ... + def __init__(self, messages: _Optional[_Iterable[_Union[_messages_pb2.Message, _Mapping]]] = ...) -> None: ... -class AckRequest(_message.Message): +class AckMessage(_message.Message): __slots__ = ("queue", "message_id") QUEUE_FIELD_NUMBER: _ClassVar[int] MESSAGE_ID_FIELD_NUMBER: _ClassVar[int] @@ -52,11 +108,39 @@ class AckRequest(_message.Message): message_id: str def __init__(self, queue: _Optional[str] = ..., message_id: _Optional[str] = ...) -> None: ... -class AckResponse(_message.Message): +class AckRequest(_message.Message): + __slots__ = ("messages",) + MESSAGES_FIELD_NUMBER: _ClassVar[int] + messages: _containers.RepeatedCompositeFieldContainer[AckMessage] + def __init__(self, messages: _Optional[_Iterable[_Union[AckMessage, _Mapping]]] = ...) -> None: ... + +class AckResult(_message.Message): + __slots__ = ("success", "error") + SUCCESS_FIELD_NUMBER: _ClassVar[int] + ERROR_FIELD_NUMBER: _ClassVar[int] + success: AckSuccess + error: AckError + def __init__(self, success: _Optional[_Union[AckSuccess, _Mapping]] = ..., error: _Optional[_Union[AckError, _Mapping]] = ...) -> None: ... + +class AckSuccess(_message.Message): __slots__ = () def __init__(self) -> None: ... -class NackRequest(_message.Message): +class AckError(_message.Message): + __slots__ = ("code", "message") + CODE_FIELD_NUMBER: _ClassVar[int] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + code: AckErrorCode + message: str + def __init__(self, code: _Optional[_Union[AckErrorCode, str]] = ..., message: _Optional[str] = ...) -> None: ... + +class AckResponse(_message.Message): + __slots__ = ("results",) + RESULTS_FIELD_NUMBER: _ClassVar[int] + results: _containers.RepeatedCompositeFieldContainer[AckResult] + def __init__(self, results: _Optional[_Iterable[_Union[AckResult, _Mapping]]] = ...) -> None: ... + +class NackMessage(_message.Message): __slots__ = ("queue", "message_id", "error") QUEUE_FIELD_NUMBER: _ClassVar[int] MESSAGE_ID_FIELD_NUMBER: _ClassVar[int] @@ -66,26 +150,50 @@ class NackRequest(_message.Message): error: str def __init__(self, queue: _Optional[str] = ..., message_id: _Optional[str] = ..., error: _Optional[str] = ...) -> None: ... -class NackResponse(_message.Message): +class NackRequest(_message.Message): + __slots__ = ("messages",) + MESSAGES_FIELD_NUMBER: _ClassVar[int] + messages: _containers.RepeatedCompositeFieldContainer[NackMessage] + def __init__(self, messages: _Optional[_Iterable[_Union[NackMessage, _Mapping]]] = ...) -> None: ... + +class NackResult(_message.Message): + __slots__ = ("success", "error") + SUCCESS_FIELD_NUMBER: _ClassVar[int] + ERROR_FIELD_NUMBER: _ClassVar[int] + success: NackSuccess + error: NackError + def __init__(self, success: _Optional[_Union[NackSuccess, _Mapping]] = ..., error: _Optional[_Union[NackError, _Mapping]] = ...) -> None: ... + +class NackSuccess(_message.Message): __slots__ = () def __init__(self) -> None: ... -class BatchEnqueueRequest(_message.Message): - __slots__ = ("messages",) - MESSAGES_FIELD_NUMBER: _ClassVar[int] - messages: _containers.RepeatedCompositeFieldContainer[EnqueueRequest] - def __init__(self, messages: _Optional[_Iterable[_Union[EnqueueRequest, _Mapping]]] = ...) -> None: ... +class NackError(_message.Message): + __slots__ = ("code", "message") + CODE_FIELD_NUMBER: _ClassVar[int] + MESSAGE_FIELD_NUMBER: _ClassVar[int] + code: NackErrorCode + message: str + def __init__(self, code: _Optional[_Union[NackErrorCode, str]] = ..., message: _Optional[str] = ...) -> None: ... -class BatchEnqueueResponse(_message.Message): +class NackResponse(_message.Message): __slots__ = ("results",) RESULTS_FIELD_NUMBER: _ClassVar[int] - results: _containers.RepeatedCompositeFieldContainer[BatchEnqueueResult] - def __init__(self, results: _Optional[_Iterable[_Union[BatchEnqueueResult, _Mapping]]] = ...) -> None: ... + results: _containers.RepeatedCompositeFieldContainer[NackResult] + def __init__(self, results: _Optional[_Iterable[_Union[NackResult, _Mapping]]] = ...) -> None: ... -class BatchEnqueueResult(_message.Message): - __slots__ = ("success", "error") - SUCCESS_FIELD_NUMBER: _ClassVar[int] - ERROR_FIELD_NUMBER: _ClassVar[int] - success: EnqueueResponse - error: str - def __init__(self, success: _Optional[_Union[EnqueueResponse, _Mapping]] = ..., error: _Optional[str] = ...) -> None: ... +class StreamEnqueueRequest(_message.Message): + __slots__ = ("messages", "sequence_number") + MESSAGES_FIELD_NUMBER: _ClassVar[int] + SEQUENCE_NUMBER_FIELD_NUMBER: _ClassVar[int] + messages: _containers.RepeatedCompositeFieldContainer[EnqueueMessage] + sequence_number: int + def __init__(self, messages: _Optional[_Iterable[_Union[EnqueueMessage, _Mapping]]] = ..., sequence_number: _Optional[int] = ...) -> None: ... + +class StreamEnqueueResponse(_message.Message): + __slots__ = ("sequence_number", "results") + SEQUENCE_NUMBER_FIELD_NUMBER: _ClassVar[int] + RESULTS_FIELD_NUMBER: _ClassVar[int] + sequence_number: int + results: _containers.RepeatedCompositeFieldContainer[EnqueueResult] + def __init__(self, sequence_number: _Optional[int] = ..., results: _Optional[_Iterable[_Union[EnqueueResult, _Mapping]]] = ...) -> None: ... diff --git a/fila/v1/service_pb2_grpc.py b/fila/v1/service_pb2_grpc.py index 0ef11e1..1f2df55 100644 --- a/fila/v1/service_pb2_grpc.py +++ b/fila/v1/service_pb2_grpc.py @@ -40,10 +40,10 @@ def __init__(self, channel): request_serializer=fila_dot_v1_dot_service__pb2.EnqueueRequest.SerializeToString, response_deserializer=fila_dot_v1_dot_service__pb2.EnqueueResponse.FromString, _registered_method=True) - self.BatchEnqueue = channel.unary_unary( - '/fila.v1.FilaService/BatchEnqueue', - request_serializer=fila_dot_v1_dot_service__pb2.BatchEnqueueRequest.SerializeToString, - response_deserializer=fila_dot_v1_dot_service__pb2.BatchEnqueueResponse.FromString, + self.StreamEnqueue = channel.stream_stream( + '/fila.v1.FilaService/StreamEnqueue', + request_serializer=fila_dot_v1_dot_service__pb2.StreamEnqueueRequest.SerializeToString, + response_deserializer=fila_dot_v1_dot_service__pb2.StreamEnqueueResponse.FromString, _registered_method=True) self.Consume = channel.unary_stream( '/fila.v1.FilaService/Consume', @@ -72,7 +72,7 @@ def Enqueue(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def BatchEnqueue(self, request, context): + def StreamEnqueue(self, request_iterator, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') @@ -104,10 +104,10 @@ def add_FilaServiceServicer_to_server(servicer, server): request_deserializer=fila_dot_v1_dot_service__pb2.EnqueueRequest.FromString, response_serializer=fila_dot_v1_dot_service__pb2.EnqueueResponse.SerializeToString, ), - 'BatchEnqueue': grpc.unary_unary_rpc_method_handler( - servicer.BatchEnqueue, - request_deserializer=fila_dot_v1_dot_service__pb2.BatchEnqueueRequest.FromString, - response_serializer=fila_dot_v1_dot_service__pb2.BatchEnqueueResponse.SerializeToString, + 'StreamEnqueue': grpc.stream_stream_rpc_method_handler( + servicer.StreamEnqueue, + request_deserializer=fila_dot_v1_dot_service__pb2.StreamEnqueueRequest.FromString, + response_serializer=fila_dot_v1_dot_service__pb2.StreamEnqueueResponse.SerializeToString, ), 'Consume': grpc.unary_stream_rpc_method_handler( servicer.Consume, @@ -164,7 +164,7 @@ def Enqueue(request, _registered_method=True) @staticmethod - def BatchEnqueue(request, + def StreamEnqueue(request_iterator, target, options=(), channel_credentials=None, @@ -174,12 +174,12 @@ def BatchEnqueue(request, wait_for_ready=None, timeout=None, metadata=None): - return grpc.experimental.unary_unary( - request, + return grpc.experimental.stream_stream( + request_iterator, target, - '/fila.v1.FilaService/BatchEnqueue', - fila_dot_v1_dot_service__pb2.BatchEnqueueRequest.SerializeToString, - fila_dot_v1_dot_service__pb2.BatchEnqueueResponse.FromString, + '/fila.v1.FilaService/StreamEnqueue', + fila_dot_v1_dot_service__pb2.StreamEnqueueRequest.SerializeToString, + fila_dot_v1_dot_service__pb2.StreamEnqueueResponse.FromString, options, channel_credentials, insecure, diff --git a/proto/fila/v1/admin.proto b/proto/fila/v1/admin.proto index 886e58d..9bb8871 100644 --- a/proto/fila/v1/admin.proto +++ b/proto/fila/v1/admin.proto @@ -11,15 +11,6 @@ service FilaAdmin { rpc GetStats(GetStatsRequest) returns (GetStatsResponse); rpc Redrive(RedriveRequest) returns (RedriveResponse); rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); - - // API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - rpc CreateApiKey(CreateApiKeyRequest) returns (CreateApiKeyResponse); - rpc RevokeApiKey(RevokeApiKeyRequest) returns (RevokeApiKeyResponse); - rpc ListApiKeys(ListApiKeysRequest) returns (ListApiKeysResponse); - - // Per-key ACL management. - rpc SetAcl(SetAclRequest) returns (SetAclResponse); - rpc GetAcl(GetAclRequest) returns (GetAclResponse); } message CreateQueueRequest { @@ -126,72 +117,3 @@ message ListQueuesResponse { repeated QueueInfo queues = 1; uint32 cluster_node_count = 2; } - -// --- API Key Management --- - -message CreateApiKeyRequest { - /// Human-readable label for the key. - string name = 1; - /// Optional Unix timestamp (milliseconds) after which the key expires. - /// 0 means no expiration. - uint64 expires_at_ms = 2; - /// When true, the key bypasses all ACL checks (superadmin). - bool is_superadmin = 3; -} - -message CreateApiKeyResponse { - /// Opaque key ID for management operations (revoke, list, set-acl). - string key_id = 1; - /// Plaintext API key. Returned once — store it securely. - string key = 2; - /// Whether this key has superadmin privileges. - bool is_superadmin = 3; -} - -message RevokeApiKeyRequest { - string key_id = 1; -} - -message RevokeApiKeyResponse {} - -message ListApiKeysRequest {} - -message ApiKeyInfo { - string key_id = 1; - string name = 2; - uint64 created_at_ms = 3; - /// 0 means no expiration. - uint64 expires_at_ms = 4; - bool is_superadmin = 5; -} - -message ListApiKeysResponse { - repeated ApiKeyInfo keys = 1; -} - -// --- ACL Management --- - -/// A single permission grant: kind (produce/consume/admin) + queue pattern. -message AclPermission { - /// One of: "produce", "consume", "admin". - string kind = 1; - /// Queue name or wildcard ("*" or "orders.*"). - string pattern = 2; -} - -message SetAclRequest { - string key_id = 1; - repeated AclPermission permissions = 2; -} - -message SetAclResponse {} - -message GetAclRequest { - string key_id = 1; -} - -message GetAclResponse { - string key_id = 1; - repeated AclPermission permissions = 2; - bool is_superadmin = 3; -} diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto index fc0f710..7d1db79 100644 --- a/proto/fila/v1/service.proto +++ b/proto/fila/v1/service.proto @@ -6,20 +6,49 @@ import "fila/v1/messages.proto"; // Hot-path RPCs for producers and consumers. service FilaService { rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); - rpc BatchEnqueue(BatchEnqueueRequest) returns (BatchEnqueueResponse); + rpc StreamEnqueue(stream StreamEnqueueRequest) returns (stream StreamEnqueueResponse); rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); rpc Ack(AckRequest) returns (AckResponse); rpc Nack(NackRequest) returns (NackResponse); } -message EnqueueRequest { +// Individual message to enqueue. +message EnqueueMessage { string queue = 1; map headers = 2; bytes payload = 3; } +// Enqueue one or more messages. +message EnqueueRequest { + repeated EnqueueMessage messages = 1; +} + +// Per-message enqueue result. +message EnqueueResult { + oneof result { + string message_id = 1; + EnqueueError error = 2; + } +} + +// Typed enqueue error with structured error code. +message EnqueueError { + EnqueueErrorCode code = 1; + string message = 2; +} + +enum EnqueueErrorCode { + ENQUEUE_ERROR_CODE_UNSPECIFIED = 0; + ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND = 1; + ENQUEUE_ERROR_CODE_STORAGE = 2; + ENQUEUE_ERROR_CODE_LUA = 3; + ENQUEUE_ERROR_CODE_PERMISSION_DENIED = 4; +} + +// One result per input message. message EnqueueResponse { - string message_id = 1; + repeated EnqueueResult results = 1; } message ConsumeRequest { @@ -27,36 +56,87 @@ message ConsumeRequest { } message ConsumeResponse { - Message message = 1; // Single message (backward compatible, used when batch size is 1) - repeated Message messages = 2; // Batched messages (populated when server sends multiple at once) + repeated Message messages = 1; } -message AckRequest { +// Individual ack item. +message AckMessage { string queue = 1; string message_id = 2; } -message AckResponse {} +message AckRequest { + repeated AckMessage messages = 1; +} + +message AckResult { + oneof result { + AckSuccess success = 1; + AckError error = 2; + } +} -message NackRequest { +message AckSuccess {} + +message AckError { + AckErrorCode code = 1; + string message = 2; +} + +enum AckErrorCode { + ACK_ERROR_CODE_UNSPECIFIED = 0; + ACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; + ACK_ERROR_CODE_STORAGE = 2; + ACK_ERROR_CODE_PERMISSION_DENIED = 3; +} + +message AckResponse { + repeated AckResult results = 1; +} + +// Individual nack item. +message NackMessage { string queue = 1; string message_id = 2; string error = 3; } -message NackResponse {} +message NackRequest { + repeated NackMessage messages = 1; +} + +message NackResult { + oneof result { + NackSuccess success = 1; + NackError error = 2; + } +} -message BatchEnqueueRequest { - repeated EnqueueRequest messages = 1; +message NackSuccess {} + +message NackError { + NackErrorCode code = 1; + string message = 2; } -message BatchEnqueueResponse { - repeated BatchEnqueueResult results = 1; +enum NackErrorCode { + NACK_ERROR_CODE_UNSPECIFIED = 0; + NACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; + NACK_ERROR_CODE_STORAGE = 2; + NACK_ERROR_CODE_PERMISSION_DENIED = 3; } -message BatchEnqueueResult { - oneof result { - EnqueueResponse success = 1; - string error = 2; - } +message NackResponse { + repeated NackResult results = 1; +} + +// Stream enqueue — per-write batch with sequence tracking. +message StreamEnqueueRequest { + repeated EnqueueMessage messages = 1; + uint64 sequence_number = 2; +} + +message StreamEnqueueResponse { + uint64 sequence_number = 1; + repeated EnqueueResult results = 2; } diff --git a/tests/test_batcher.py b/tests/test_batcher.py index ee10e71..3489a42 100644 --- a/tests/test_batcher.py +++ b/tests/test_batcher.py @@ -13,44 +13,38 @@ import pytest from fila.batcher import ( - AutoBatcher, - LingerBatcher, - _EnqueueRequest, - _flush_batch, + AutoAccumulator, + LingerAccumulator, + _EnqueueItem, + _flush_many, _flush_single, ) -from fila.errors import BatchEnqueueError +from fila.errors import EnqueueError from fila.v1 import service_pb2 -class FakeEnqueueResponse: - """Minimal fake for service_pb2.EnqueueResponse.""" - - def __init__(self, message_id: str) -> None: - self.message_id = message_id - - -class FakeBatchResult: - """Minimal fake for service_pb2.BatchEnqueueResult.""" +class FakeEnqueueResult: + """Minimal fake for service_pb2.EnqueueResult.""" - def __init__(self, message_id: str | None = None, error: str | None = None) -> None: + def __init__(self, message_id: str | None = None, error_msg: str | None = None) -> None: self._message_id = message_id - self._error = error - self.success: FakeEnqueueResponse | None = ( - FakeEnqueueResponse(message_id) if message_id is not None else None - ) - self.error = error or "" + self._error_msg = error_msg + self.message_id = message_id or "" + self.error = MagicMock() + self.error.message = error_msg or "" - def HasField(self, name: str) -> bool: # noqa: N802 - if name == "success": - return self._message_id is not None - return False + def WhichOneof(self, name: str) -> str | None: # noqa: N802 + if name == "result": + if self._message_id is not None: + return "message_id" + return "error" + return None -class FakeBatchResponse: - """Minimal fake for service_pb2.BatchEnqueueResponse.""" +class FakeEnqueueResponse: + """Minimal fake for service_pb2.EnqueueResponse.""" - def __init__(self, results: list[FakeBatchResult]) -> None: + def __init__(self, results: list[FakeEnqueueResult]) -> None: self.results = results @@ -59,25 +53,23 @@ class TestFlushSingle: def test_success(self) -> None: stub = MagicMock() - stub.Enqueue.return_value = FakeEnqueueResponse("msg-001") + stub.Enqueue.return_value = FakeEnqueueResponse([ + FakeEnqueueResult(message_id="msg-001"), + ]) - proto = service_pb2.EnqueueRequest(queue="q", payload=b"data") + proto = service_pb2.EnqueueMessage(queue="q", payload=b"data") fut: Future[str] = Future() - req = _EnqueueRequest(proto, fut) + req = _EnqueueItem(proto, fut) _flush_single(stub, req) assert fut.result(timeout=1.0) == "msg-001" - stub.Enqueue.assert_called_once_with(proto) + stub.Enqueue.assert_called_once() def test_rpc_error(self) -> None: import grpc stub = MagicMock() - rpc_error = MagicMock() - rpc_error.code.return_value = grpc.StatusCode.NOT_FOUND - rpc_error.details.return_value = "queue not found" - # Make it pass isinstance(e, grpc.RpcError) check. stub.Enqueue.side_effect = type( "_FakeRpcError", (grpc.RpcError,), { "code": lambda self: grpc.StatusCode.NOT_FOUND, @@ -85,9 +77,9 @@ def test_rpc_error(self) -> None: } )() - proto = service_pb2.EnqueueRequest(queue="missing", payload=b"data") + proto = service_pb2.EnqueueMessage(queue="missing", payload=b"data") fut: Future[str] = Future() - req = _EnqueueRequest(proto, fut) + req = _EnqueueItem(proto, fut) _flush_single(stub, req) @@ -97,164 +89,151 @@ def test_rpc_error(self) -> None: fut.result(timeout=1.0) -class TestFlushBatch: - """Test the _flush_batch function.""" +class TestFlushMany: + """Test the _flush_many function.""" def test_all_success(self) -> None: stub = MagicMock() - stub.BatchEnqueue.return_value = FakeBatchResponse([ - FakeBatchResult(message_id="id-1"), - FakeBatchResult(message_id="id-2"), + stub.Enqueue.return_value = FakeEnqueueResponse([ + FakeEnqueueResult(message_id="id-1"), + FakeEnqueueResult(message_id="id-2"), ]) - reqs = [ - _EnqueueRequest( - service_pb2.EnqueueRequest(queue="q", payload=b"a"), + items = [ + _EnqueueItem( + service_pb2.EnqueueMessage(queue="q", payload=b"a"), Future(), ), - _EnqueueRequest( - service_pb2.EnqueueRequest(queue="q", payload=b"b"), + _EnqueueItem( + service_pb2.EnqueueMessage(queue="q", payload=b"b"), Future(), ), ] - _flush_batch(stub, reqs) + _flush_many(stub, items) - assert reqs[0].future.result(timeout=1.0) == "id-1" - assert reqs[1].future.result(timeout=1.0) == "id-2" + assert items[0].future.result(timeout=1.0) == "id-1" + assert items[1].future.result(timeout=1.0) == "id-2" def test_mixed_results(self) -> None: stub = MagicMock() - stub.BatchEnqueue.return_value = FakeBatchResponse([ - FakeBatchResult(message_id="id-1"), - FakeBatchResult(error="queue 'missing' not found"), + stub.Enqueue.return_value = FakeEnqueueResponse([ + FakeEnqueueResult(message_id="id-1"), + FakeEnqueueResult(error_msg="queue 'missing' not found"), ]) - reqs = [ - _EnqueueRequest( - service_pb2.EnqueueRequest(queue="q", payload=b"a"), + items = [ + _EnqueueItem( + service_pb2.EnqueueMessage(queue="q", payload=b"a"), Future(), ), - _EnqueueRequest( - service_pb2.EnqueueRequest(queue="missing", payload=b"b"), + _EnqueueItem( + service_pb2.EnqueueMessage(queue="missing", payload=b"b"), Future(), ), ] - _flush_batch(stub, reqs) + _flush_many(stub, items) - assert reqs[0].future.result(timeout=1.0) == "id-1" - with pytest.raises(BatchEnqueueError, match="queue 'missing' not found"): - reqs[1].future.result(timeout=1.0) + assert items[0].future.result(timeout=1.0) == "id-1" + with pytest.raises(EnqueueError, match="queue 'missing' not found"): + items[1].future.result(timeout=1.0) def test_rpc_failure_sets_all_futures(self) -> None: import grpc stub = MagicMock() - stub.BatchEnqueue.side_effect = type( + stub.Enqueue.side_effect = type( "_FakeRpcError", (grpc.RpcError,), { "code": lambda self: grpc.StatusCode.UNAVAILABLE, "details": lambda self: "server unavailable", } )() - reqs = [ - _EnqueueRequest( - service_pb2.EnqueueRequest(queue="q", payload=b"a"), + items = [ + _EnqueueItem( + service_pb2.EnqueueMessage(queue="q", payload=b"a"), Future(), ), - _EnqueueRequest( - service_pb2.EnqueueRequest(queue="q", payload=b"b"), + _EnqueueItem( + service_pb2.EnqueueMessage(queue="q", payload=b"b"), Future(), ), ] - _flush_batch(stub, reqs) + _flush_many(stub, items) - for r in reqs: - with pytest.raises(BatchEnqueueError): - r.future.result(timeout=1.0) + for item in items: + with pytest.raises(EnqueueError): + item.future.result(timeout=1.0) -class TestAutoBatcher: - """Test the AutoBatcher end-to-end.""" +class TestAutoAccumulator: + """Test the AutoAccumulator end-to-end.""" def test_single_message_uses_enqueue(self) -> None: - """When only one message is queued, AutoBatcher uses singular Enqueue.""" + """When only one message is queued, AutoAccumulator uses Enqueue with one message.""" stub = MagicMock() - stub.Enqueue.return_value = FakeEnqueueResponse("msg-solo") + stub.Enqueue.return_value = FakeEnqueueResponse([ + FakeEnqueueResult(message_id="msg-solo"), + ]) - batcher = AutoBatcher(stub, max_batch_size=100) + accumulator = AutoAccumulator(stub, max_messages=100) - proto = service_pb2.EnqueueRequest(queue="q", payload=b"solo") - fut = batcher.submit(proto) + proto = service_pb2.EnqueueMessage(queue="q", payload=b"solo") + fut = accumulator.submit(proto) result = fut.result(timeout=5.0) assert result == "msg-solo" stub.Enqueue.assert_called_once() - stub.BatchEnqueue.assert_not_called() - batcher.close() + accumulator.close() - def test_concurrent_messages_batched(self) -> None: - """When multiple messages arrive concurrently, they batch together.""" + def test_concurrent_messages_accumulated(self) -> None: + """When multiple messages arrive concurrently, they accumulate together.""" stub = MagicMock() - # The first message will block Enqueue while more messages queue up. - # We need to make the batcher see all messages at once. - batch_called = threading.Event() - batch_response = FakeBatchResponse([ - FakeBatchResult(message_id=f"id-{i}") for i in range(5) + enqueue_response = FakeEnqueueResponse([ + FakeEnqueueResult(message_id=f"id-{i}") for i in range(5) ]) - def mock_batch_enqueue(request: Any) -> FakeBatchResponse: - batch_called.set() - return batch_response - - # Make single Enqueue block briefly so messages accumulate. - single_barrier = threading.Event() - - def mock_single_enqueue(request: Any) -> FakeEnqueueResponse: - single_barrier.wait(timeout=5.0) - return FakeEnqueueResponse("should-not-be-used") + def mock_enqueue(request: Any) -> FakeEnqueueResponse: + return enqueue_response - stub.Enqueue.side_effect = mock_single_enqueue - stub.BatchEnqueue.side_effect = mock_batch_enqueue + stub.Enqueue.side_effect = mock_enqueue - batcher = AutoBatcher(stub, max_batch_size=100) + accumulator = AutoAccumulator(stub, max_messages=100) - # Submit 5 messages rapidly before the first can process. - # The batcher should drain them all in one batch. + # Submit 5 messages rapidly. protos = [ - service_pb2.EnqueueRequest(queue="q", payload=f"msg-{i}".encode()) + service_pb2.EnqueueMessage(queue="q", payload=f"msg-{i}".encode()) for i in range(5) ] - # We need to submit them in a way that they all arrive before - # the batcher loop drains. Use a barrier approach. futures = [] for p in protos: - futures.append(batcher.submit(p)) + futures.append(accumulator.submit(p)) - # Give the batcher thread time to drain and flush. - # Either BatchEnqueue or multiple Enqueue calls will resolve things. + # All futures should resolve. for _i, f in enumerate(futures): result = f.result(timeout=5.0) assert result is not None - batcher.close() + accumulator.close() def test_close_drains_pending(self) -> None: """close() waits for pending messages to be flushed.""" stub = MagicMock() - stub.Enqueue.return_value = FakeEnqueueResponse("drained") + stub.Enqueue.return_value = FakeEnqueueResponse([ + FakeEnqueueResult(message_id="drained"), + ]) - batcher = AutoBatcher(stub, max_batch_size=100) + accumulator = AutoAccumulator(stub, max_messages=100) - proto = service_pb2.EnqueueRequest(queue="q", payload=b"drain-me") - fut = batcher.submit(proto) + proto = service_pb2.EnqueueMessage(queue="q", payload=b"drain-me") + fut = accumulator.submit(proto) - batcher.close() + accumulator.close() # After close, the future should be resolved. assert fut.result(timeout=1.0) == "drained" @@ -263,71 +242,77 @@ def test_update_stub(self) -> None: """update_stub replaces the gRPC stub used for flushing.""" old_stub = MagicMock() new_stub = MagicMock() - new_stub.Enqueue.return_value = FakeEnqueueResponse("new-stub") + new_stub.Enqueue.return_value = FakeEnqueueResponse([ + FakeEnqueueResult(message_id="new-stub"), + ]) - batcher = AutoBatcher(old_stub, max_batch_size=100) + accumulator = AutoAccumulator(old_stub, max_messages=100) # Update stub before submitting. - batcher.update_stub(new_stub) + accumulator.update_stub(new_stub) - proto = service_pb2.EnqueueRequest(queue="q", payload=b"data") - fut = batcher.submit(proto) + proto = service_pb2.EnqueueMessage(queue="q", payload=b"data") + fut = accumulator.submit(proto) result = fut.result(timeout=5.0) assert result == "new-stub" - batcher.close() + accumulator.close() -class TestLingerBatcher: - """Test the LingerBatcher.""" +class TestLingerAccumulator: + """Test the LingerAccumulator.""" - def test_flushes_at_batch_size(self) -> None: - """Flush triggers when batch_size messages accumulate.""" + def test_flushes_at_max_messages(self) -> None: + """Flush triggers when max_messages messages accumulate.""" stub = MagicMock() - stub.BatchEnqueue.return_value = FakeBatchResponse([ - FakeBatchResult(message_id=f"id-{i}") for i in range(3) + stub.Enqueue.return_value = FakeEnqueueResponse([ + FakeEnqueueResult(message_id=f"id-{i}") for i in range(3) ]) - batcher = LingerBatcher(stub, linger_ms=5000, batch_size=3) + accumulator = LingerAccumulator(stub, linger_ms=5000, max_messages=3) futures = [] for i in range(3): - proto = service_pb2.EnqueueRequest(queue="q", payload=f"m{i}".encode()) - futures.append(batcher.submit(proto)) + proto = service_pb2.EnqueueMessage(queue="q", payload=f"m{i}".encode()) + futures.append(accumulator.submit(proto)) - # Should flush quickly because batch_size=3 was reached. + # Should flush quickly because max_messages=3 was reached. for i, f in enumerate(futures): result = f.result(timeout=5.0) assert result == f"id-{i}" - batcher.close() + accumulator.close() def test_flushes_at_linger_timeout(self) -> None: - """Flush triggers after linger_ms even if batch_size is not reached.""" + """Flush triggers after linger_ms even if max_messages is not reached.""" stub = MagicMock() - stub.Enqueue.return_value = FakeEnqueueResponse("lingered") + stub.Enqueue.return_value = FakeEnqueueResponse([ + FakeEnqueueResult(message_id="lingered"), + ]) - batcher = LingerBatcher(stub, linger_ms=50, batch_size=100) + accumulator = LingerAccumulator(stub, linger_ms=50, max_messages=100) - proto = service_pb2.EnqueueRequest(queue="q", payload=b"linger") - fut = batcher.submit(proto) + proto = service_pb2.EnqueueMessage(queue="q", payload=b"linger") + fut = accumulator.submit(proto) - # Should flush after ~50ms even though batch_size=100 not reached. + # Should flush after ~50ms even though max_messages=100 not reached. result = fut.result(timeout=5.0) assert result == "lingered" - batcher.close() + accumulator.close() def test_close_drains_pending(self) -> None: """close() drains any pending messages.""" stub = MagicMock() - stub.Enqueue.return_value = FakeEnqueueResponse("drained") + stub.Enqueue.return_value = FakeEnqueueResponse([ + FakeEnqueueResult(message_id="drained"), + ]) - batcher = LingerBatcher(stub, linger_ms=10000, batch_size=100) + accumulator = LingerAccumulator(stub, linger_ms=10000, max_messages=100) - proto = service_pb2.EnqueueRequest(queue="q", payload=b"drain") - fut = batcher.submit(proto) + proto = service_pb2.EnqueueMessage(queue="q", payload=b"drain") + fut = accumulator.submit(proto) - batcher.close() + accumulator.close() assert fut.result(timeout=1.0) == "drained" diff --git a/tests/test_batch_integration.py b/tests/test_enqueue_integration.py similarity index 51% rename from tests/test_batch_integration.py rename to tests/test_enqueue_integration.py index 09aefb9..4900d64 100644 --- a/tests/test_batch_integration.py +++ b/tests/test_enqueue_integration.py @@ -1,4 +1,4 @@ -"""Integration tests for batch enqueue and smart batching. +"""Integration tests for enqueue_many and accumulator modes. These tests require a running fila-server binary. They are skipped automatically when the server is not found (local dev). @@ -11,21 +11,23 @@ import fila -class TestBatchEnqueue: - """Integration tests for the explicit batch_enqueue method.""" +class TestEnqueueMany: + """Integration tests for the explicit enqueue_many method.""" - def test_batch_enqueue_multiple_messages(self, server: object) -> None: - """batch_enqueue sends multiple messages in one RPC and returns per-message results.""" + def test_enqueue_many_multiple_messages(self, server: object) -> None: + """enqueue_many sends multiple messages in one RPC and returns per-message results.""" from tests.conftest import TestServer assert isinstance(server, TestServer) - server.create_queue("test-batch") + server.create_queue("test-enqueue-many") - with fila.Client(server.addr, batch_mode=fila.BatchMode.DISABLED) as client: - results = client.batch_enqueue([ - ("test-batch", {"idx": "0"}, b"payload-0"), - ("test-batch", {"idx": "1"}, b"payload-1"), - ("test-batch", {"idx": "2"}, b"payload-2"), + with fila.Client( + server.addr, accumulator_mode=fila.AccumulatorMode.DISABLED + ) as client: + results = client.enqueue_many([ + ("test-enqueue-many", {"idx": "0"}, b"payload-0"), + ("test-enqueue-many", {"idx": "1"}, b"payload-1"), + ("test-enqueue-many", {"idx": "2"}, b"payload-2"), ]) assert len(results) == 3 @@ -38,60 +40,64 @@ def test_batch_enqueue_multiple_messages(self, server: object) -> None: ids = [r.message_id for r in results] assert len(set(ids)) == 3 - def test_batch_enqueue_single_message(self, server: object) -> None: - """batch_enqueue works with a single message.""" + def test_enqueue_many_single_message(self, server: object) -> None: + """enqueue_many works with a single message.""" from tests.conftest import TestServer assert isinstance(server, TestServer) - server.create_queue("test-batch-single") + server.create_queue("test-enqueue-many-single") - with fila.Client(server.addr, batch_mode=fila.BatchMode.DISABLED) as client: - results = client.batch_enqueue([ - ("test-batch-single", None, b"solo"), + with fila.Client( + server.addr, accumulator_mode=fila.AccumulatorMode.DISABLED + ) as client: + results = client.enqueue_many([ + ("test-enqueue-many-single", None, b"solo"), ]) assert len(results) == 1 assert results[0].is_success assert results[0].message_id is not None - def test_batch_enqueue_consume_verify(self, server: object) -> None: - """Messages enqueued via batch_enqueue can be consumed and acked.""" + def test_enqueue_many_consume_verify(self, server: object) -> None: + """Messages enqueued via enqueue_many can be consumed and acked.""" from tests.conftest import TestServer assert isinstance(server, TestServer) - server.create_queue("test-batch-consume") + server.create_queue("test-enqueue-many-consume") - with fila.Client(server.addr, batch_mode=fila.BatchMode.DISABLED) as client: - results = client.batch_enqueue([ - ("test-batch-consume", {"k": "v"}, b"batch-msg"), + with fila.Client( + server.addr, accumulator_mode=fila.AccumulatorMode.DISABLED + ) as client: + results = client.enqueue_many([ + ("test-enqueue-many-consume", {"k": "v"}, b"multi-msg"), ]) assert results[0].is_success - stream = client.consume("test-batch-consume") + stream = client.consume("test-enqueue-many-consume") msg = next(stream) assert msg.id == results[0].message_id assert msg.headers["k"] == "v" - assert msg.payload == b"batch-msg" + assert msg.payload == b"multi-msg" - client.ack("test-batch-consume", msg.id) + client.ack("test-enqueue-many-consume", msg.id) -class TestAsyncBatchEnqueue: - """Integration tests for the async batch_enqueue method.""" +class TestAsyncEnqueueMany: + """Integration tests for the async enqueue_many method.""" @pytest.mark.asyncio - async def test_async_batch_enqueue(self, server: object) -> None: - """Async batch_enqueue sends multiple messages.""" + async def test_async_enqueue_many(self, server: object) -> None: + """Async enqueue_many sends multiple messages.""" from tests.conftest import TestServer assert isinstance(server, TestServer) - server.create_queue("test-async-batch") + server.create_queue("test-async-enqueue-many") async with fila.AsyncClient(server.addr) as client: - results = await client.batch_enqueue([ - ("test-async-batch", None, b"async-0"), - ("test-async-batch", None, b"async-1"), + results = await client.enqueue_many([ + ("test-async-enqueue-many", None, b"async-0"), + ("test-async-enqueue-many", None, b"async-1"), ]) assert len(results) == 2 @@ -100,26 +106,28 @@ async def test_async_batch_enqueue(self, server: object) -> None: assert r.message_id is not None -class TestSmartBatching: - """Integration tests for smart batching (BatchMode.AUTO).""" +class TestAccumulatorModes: + """Integration tests for accumulator modes (AccumulatorMode.AUTO, Linger).""" def test_auto_mode_enqueue(self, server: object) -> None: - """AUTO mode enqueues messages through the batcher.""" + """AUTO mode enqueues messages through the accumulator.""" from tests.conftest import TestServer assert isinstance(server, TestServer) - server.create_queue("test-auto-batch") + server.create_queue("test-auto-accum") - with fila.Client(server.addr, batch_mode=fila.BatchMode.AUTO) as client: - msg_id = client.enqueue("test-auto-batch", None, b"auto-msg") + with fila.Client( + server.addr, accumulator_mode=fila.AccumulatorMode.AUTO + ) as client: + msg_id = client.enqueue("test-auto-accum", None, b"auto-msg") assert msg_id != "" # Verify the message was actually enqueued. - stream = client.consume("test-auto-batch") + stream = client.consume("test-auto-accum") msg = next(stream) assert msg.id == msg_id assert msg.payload == b"auto-msg" - client.ack("test-auto-batch", msg.id) + client.ack("test-auto-accum", msg.id) def test_auto_mode_multiple_messages(self, server: object) -> None: """AUTO mode handles multiple sequential enqueues.""" @@ -128,7 +136,9 @@ def test_auto_mode_multiple_messages(self, server: object) -> None: assert isinstance(server, TestServer) server.create_queue("test-auto-multi") - with fila.Client(server.addr, batch_mode=fila.BatchMode.AUTO) as client: + with fila.Client( + server.addr, accumulator_mode=fila.AccumulatorMode.AUTO + ) as client: ids = [] for i in range(5): msg_id = client.enqueue( @@ -147,7 +157,9 @@ def test_disabled_mode_enqueue(self, server: object) -> None: assert isinstance(server, TestServer) server.create_queue("test-disabled") - with fila.Client(server.addr, batch_mode=fila.BatchMode.DISABLED) as client: + with fila.Client( + server.addr, accumulator_mode=fila.AccumulatorMode.DISABLED + ) as client: msg_id = client.enqueue("test-disabled", None, b"direct") assert msg_id != "" @@ -157,7 +169,7 @@ def test_disabled_mode_enqueue(self, server: object) -> None: client.ack("test-disabled", msg.id) def test_linger_mode_enqueue(self, server: object) -> None: - """LINGER mode enqueues messages through a timer-based batcher.""" + """LINGER mode enqueues messages through a timer-based accumulator.""" from tests.conftest import TestServer assert isinstance(server, TestServer) @@ -165,7 +177,7 @@ def test_linger_mode_enqueue(self, server: object) -> None: with fila.Client( server.addr, - batch_mode=fila.Linger(linger_ms=50, batch_size=10), + accumulator_mode=fila.Linger(linger_ms=50, max_messages=10), ) as client: msg_id = client.enqueue("test-linger", None, b"lingered") assert msg_id != "" @@ -177,44 +189,44 @@ def test_linger_mode_enqueue(self, server: object) -> None: client.ack("test-linger", msg.id) def test_default_mode_is_auto(self, server: object) -> None: - """Client defaults to AUTO batch mode.""" + """Client defaults to AUTO accumulator mode.""" from tests.conftest import TestServer assert isinstance(server, TestServer) server.create_queue("test-default-mode") - # No batch_mode arg = AUTO. + # No accumulator_mode arg = AUTO. with fila.Client(server.addr) as client: msg_id = client.enqueue("test-default-mode", None, b"default") assert msg_id != "" -class TestBatchModeTypes: - """Unit tests for BatchMode and Linger types (no server needed).""" +class TestAccumulatorModeTypes: + """Unit tests for AccumulatorMode and Linger types (no server needed).""" - def test_batch_mode_enum(self) -> None: - """BatchMode has AUTO and DISABLED variants.""" - assert fila.BatchMode.AUTO is not None - assert fila.BatchMode.DISABLED is not None - modes = {fila.BatchMode.AUTO, fila.BatchMode.DISABLED} + def test_accumulator_mode_enum(self) -> None: + """AccumulatorMode has AUTO and DISABLED variants.""" + assert fila.AccumulatorMode.AUTO is not None + assert fila.AccumulatorMode.DISABLED is not None + modes = {fila.AccumulatorMode.AUTO, fila.AccumulatorMode.DISABLED} assert len(modes) == 2 # They are distinct values def test_linger_fields(self) -> None: - """Linger stores linger_ms and batch_size.""" - linger = fila.Linger(linger_ms=100, batch_size=50) + """Linger stores linger_ms and max_messages.""" + linger = fila.Linger(linger_ms=100, max_messages=50) assert linger.linger_ms == 100 - assert linger.batch_size == 50 + assert linger.max_messages == 50 - def test_batch_enqueue_result_success(self) -> None: - """BatchEnqueueResult.is_success returns True when message_id is set.""" - r = fila.BatchEnqueueResult(message_id="abc", error=None) + def test_enqueue_result_success(self) -> None: + """EnqueueResult.is_success returns True when message_id is set.""" + r = fila.EnqueueResult(message_id="abc", error=None) assert r.is_success assert r.message_id == "abc" assert r.error is None - def test_batch_enqueue_result_error(self) -> None: - """BatchEnqueueResult.is_success returns False when error is set.""" - r = fila.BatchEnqueueResult(message_id=None, error="queue not found") + def test_enqueue_result_error(self) -> None: + """EnqueueResult.is_success returns False when error is set.""" + r = fila.EnqueueResult(message_id=None, error="queue not found") assert not r.is_success assert r.message_id is None assert r.error == "queue not found"