From b9ab475ff84b21ec4df6bcf881e96ccde7427c2f Mon Sep 17 00:00:00 2001 From: evgeny Date: Wed, 3 Dec 2025 10:38:39 +0000 Subject: [PATCH] [AIT-96] feat: RealtimeChannel publish over WebSocket implementation Implemented Spec points: ## Message Publishing Specifications (RTL6) ### RTL6c - Messages published on channels in specific states - Messages published when channel is not **ATTACHED** should be published immediately ### RTL6c2 - Message queuing behavior - Messages can be queued when connection/channel is not ready - Relates to processing queued messages when connection becomes ready ### RTL6c3 - Publishing without implicit attach ### RTL6c4 - Behavior when queueMessages client option is false ### RTL6d - Message bundling restrictions #### RTL6d1: Maximum message size limits for bundling - **RTL6d2**: All messages in bundle must have same clientId #### RTL6d3: Can only bundle messages for same channel - **RTL6d4**: Can only bundle messages with same action (MESSAGE or PRESENCE) #### RTL6d7: Cannot bundle idempotent messages with non-idempotent messages --- ## Message Acknowledgment (RTN7) ### RTN7a All **PRESENCE**, **MESSAGE**, **ANNOTATION**, and **OBJECT** ProtocolMessages sent to Ably expect either an **ACK** or **NACK** to confirm successful receipt or failure ### RTN7b Every ProtocolMessage requiring acknowledgment must contain a unique serially incrementing `msgSerial` integer starting at zero ### RTN7c If connection enters **SUSPENDED**, **CLOSED**, or **FAILED** state and ACK/NACK has not been received, client should fail those messages and remove them from retry queues ### RTN7d If `queueMessages` is false, messages entering **DISCONNECTED** state without acknowledgment should be treated as failed immediately ### RTN7e When connection state changes to **SUSPENDED**/**CLOSED**/**FAILED**, pending messages (submitted via RTL6c1 or RTL6c2) awaiting ACK/NACK should be considered failed --- ## Message Resending and Serial Handling (RTN19) ### RTN19a Upon reconnection after disconnection, client library must resend all pending messages awaiting acknowledgment, allowing the realtime system to respond with ACK/NACK ### RTN19a2 In the event of a new `connectionId` (connection not resumed), previous `msgSerials` are meaningless and must be reset. The `msgSerial` counter resets to 0 for the new connection --- ## Channel State and Reattachment (RTL3, RTL4, RTL5) ### RTL3c Channel state implications when connection goes into **SUSPENDED** ### RTL3d When connection enters **CONNECTED** state, channels in **ATTACHING**, **ATTACHED**, or **SUSPENDED** states should transition to **ATTACHING** and initiate attach sequence. Connection should process queued messages immediately without waiting for attach operations to finish ### RTL4c - Attach sequence - **RTL4c1**: ATTACH message includes channel serial to resume from previous message or attachment ### RTL5i If channel is **DETACHING**, re-send **DETACH** and remain in 'detaching' state --- ably/realtime/connectionmanager.py | 253 ++++- ably/realtime/realtime_channel.py | 136 ++- ably/transport/websockettransport.py | 22 + ably/util/helper.py | 29 + .../realtime/realtimechannel_publish_test.py | 976 ++++++++++++++++++ 5 files changed, 1390 insertions(+), 26 deletions(-) create mode 100644 test/ably/realtime/realtimechannel_publish_test.py diff --git a/ably/realtime/connectionmanager.py b/ably/realtime/connectionmanager.py index ef74caaa..e2df3074 100644 --- a/ably/realtime/connectionmanager.py +++ b/ably/realtime/connectionmanager.py @@ -2,8 +2,8 @@ import asyncio import logging +from collections import deque from datetime import datetime -from queue import Queue from typing import TYPE_CHECKING import httpx @@ -24,6 +24,88 @@ log = logging.getLogger(__name__) +class PendingMessage: + """Represents a message awaiting acknowledgment from the server""" + + def __init__(self, message: dict): + self.message = message + self.future: asyncio.Future | None = None + action = message.get('action') + + # Messages that require acknowledgment: MESSAGE, PRESENCE, ANNOTATION, OBJECT + self.ack_required = action in ( + ProtocolMessageAction.MESSAGE, + ProtocolMessageAction.PRESENCE, + ProtocolMessageAction.ANNOTATION, + ProtocolMessageAction.OBJECT, + ) + + if self.ack_required: + self.future = asyncio.Future() + + +class PendingMessageQueue: + """Queue for tracking messages awaiting acknowledgment""" + + def __init__(self): + self.messages: list[PendingMessage] = [] + + def push(self, pending_message: PendingMessage) -> None: + """Add a message to the queue""" + self.messages.append(pending_message) + + def count(self) -> int: + """Return the number of pending messages""" + return len(self.messages) + + def complete_messages(self, serial: int, count: int, err: AblyException | None = None) -> None: + """Complete messages based on serial and count from ACK/NACK + + Args: + serial: The msgSerial of the first message being acknowledged + count: The number of messages being acknowledged + err: Error from NACK, or None for successful ACK + """ + log.debug(f'MessageQueue.complete_messages(): serial={serial}, count={count}, err={err}') + + if not self.messages: + log.warning('MessageQueue.complete_messages(): called on empty queue') + return + + first = self.messages[0] + if first: + start_serial = first.message.get('msgSerial') + if start_serial is None: + log.warning('MessageQueue.complete_messages(): first message has no msgSerial') + return + + end_serial = serial + count + + if end_serial > start_serial: + # Remove and complete the acknowledged messages + num_to_complete = min(end_serial - start_serial, len(self.messages)) + completed_messages = self.messages[:num_to_complete] + self.messages = self.messages[num_to_complete:] + + for msg in completed_messages: + if msg.future and not msg.future.done(): + if err: + msg.future.set_exception(err) + else: + msg.future.set_result(None) + + def complete_all_messages(self, err: AblyException) -> None: + """Complete all pending messages with an error""" + while self.messages: + msg = self.messages.pop(0) + if msg.future and not msg.future.done(): + msg.future.set_exception(err) + + def clear(self) -> None: + """Clear all messages from the queue""" + self.messages.clear() + + class ConnectionManager(EventEmitter): def __init__(self, realtime: AblyRealtime, initial_state): self.options = realtime.options @@ -41,8 +123,10 @@ def __init__(self, realtime: AblyRealtime, initial_state): self.connect_base_task: asyncio.Task | None = None self.disconnect_transport_task: asyncio.Task | None = None self.__fallback_hosts: list[str] = self.options.get_fallback_realtime_hosts() - self.queued_messages: Queue = Queue() + self.queued_messages: deque[PendingMessage] = deque() self.__error_reason: AblyException | None = None + self.msg_serial: int = 0 + self.pending_message_queue: PendingMessageQueue = PendingMessageQueue() super().__init__() def enact_state_change(self, state: ConnectionState, reason: AblyException | None = None) -> None: @@ -88,37 +172,109 @@ async def close_impl(self) -> None: self.notify_state(ConnectionState.CLOSED) async def send_protocol_message(self, protocol_message: dict) -> None: - if self.state in ( - ConnectionState.DISCONNECTED, - ConnectionState.CONNECTING, - ): - self.queued_messages.put(protocol_message) - return - - if self.state == ConnectionState.CONNECTED: - if self.transport: - await self.transport.send(protocol_message) - else: - log.exception( - "ConnectionManager.send_protocol_message(): can not send message with no active transport" + """Send a protocol message and optionally track it for acknowledgment + + Args: + protocol_message: protocol message dict (new message) + Returns: + None + """ + if self.state not in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING, ConnectionState.CONNECTED): + raise AblyException(f"ConnectionManager.send_protocol_message(): called in {self.state}", 500, 50000) + + pending_message = PendingMessage(protocol_message) + + # Assign msgSerial to messages that need acknowledgment + if pending_message.ack_required: + # New message - assign fresh serial + protocol_message['msgSerial'] = self.msg_serial + self.pending_message_queue.push(pending_message) + self.msg_serial += 1 + + if self.state in (ConnectionState.DISCONNECTED, ConnectionState.CONNECTING): + self.queued_messages.appendleft(pending_message) + if pending_message.ack_required: + await pending_message.future + return None + + return await self._send_protocol_message_on_connected_state(pending_message) + + async def _send_protocol_message_on_connected_state(self, pending_message: PendingMessage) -> None: + if self.state == ConnectionState.CONNECTED and self.transport: + # Add to pending queue before sending (for messages being resent from queue) + if pending_message.ack_required and pending_message not in self.pending_message_queue.messages: + self.pending_message_queue.push(pending_message) + await self.transport.send(pending_message.message) + else: + log.exception( + "ConnectionManager.send_protocol_message(): can not send message with no active transport" + ) + if pending_message.future: + pending_message.future.set_exception( + AblyException("No active transport", 500, 50000) ) + if pending_message.ack_required: + await pending_message.future + return None + + def send_queued_messages(self) -> None: + log.info(f'ConnectionManager.send_queued_messages(): sending {len(self.queued_messages)} message(s)') + while len(self.queued_messages) > 0: + pending_message = self.queued_messages.pop() + asyncio.create_task(self._send_protocol_message_on_connected_state(pending_message)) + + def requeue_pending_messages(self) -> None: + """RTN19a: Requeue messages awaiting ACK/NACK when transport disconnects + + These messages will be resent when connection becomes CONNECTED again. + RTN19a2: msgSerial is preserved for resume, reset for new connection. + """ + pending_count = self.pending_message_queue.count() + if pending_count == 0: return - raise AblyException(f"ConnectionManager.send_protocol_message(): called in {self.state}", 500, 50000) + log.info( + f'ConnectionManager.requeue_pending_messages(): ' + f'requeuing {pending_count} pending message(s) for resend' + ) - def send_queued_messages(self) -> None: - log.info(f'ConnectionManager.send_queued_messages(): sending {self.queued_messages.qsize()} message(s)') - while not self.queued_messages.empty(): - asyncio.create_task(self.send_protocol_message(self.queued_messages.get())) + # Get all pending messages and add them back to the queue + # They'll be sent again when we reconnect + pending_messages = list(self.pending_message_queue.messages) + + # Add back to front of queue (FIFO but priority over new messages) + # Store the entire PendingMessage object to preserve Future + for pending_msg in reversed(pending_messages): + # PendingMessage object retains its Future, msgSerial + self.queued_messages.append(pending_msg) + + # Clear the message queue since we're requeueing them all + # When they're resent, the existing Future will be resolved + self.pending_message_queue.clear() def fail_queued_messages(self, err) -> None: log.info( - f"ConnectionManager.fail_queued_messages(): discarding {self.queued_messages.qsize()} messages;" + + f"ConnectionManager.fail_queued_messages(): discarding {len(self.queued_messages)} messages;" + f" reason = {err}" ) - while not self.queued_messages.empty(): - msg = self.queued_messages.get() - log.exception(f"ConnectionManager.fail_queued_messages(): Failed to send protocol message: {msg}") + error = err or AblyException("Connection failed", 80000, 500) + while len(self.queued_messages) > 0: + pending_msg = self.queued_messages.pop() + log.exception( + f"ConnectionManager.fail_queued_messages(): Failed to send protocol message: " + f"{pending_msg.message}" + ) + # Fail the Future if it exists + if pending_msg.future and not pending_msg.future.done(): + pending_msg.future.set_exception(error) + + # Also fail all pending messages awaiting acknowledgment + if self.pending_message_queue.count() > 0: + count = self.pending_message_queue.count() + log.info( + f"ConnectionManager.fail_queued_messages(): failing {count} pending messages" + ) + self.pending_message_queue.complete_all_messages(error) async def ping(self) -> float: if self.__ping_future: @@ -149,6 +305,16 @@ def on_connected(self, connection_details: ConnectionDetails, connection_id: str reason: AblyException | None = None) -> None: self.__fail_state = ConnectionState.DISCONNECTED + # RTN19a2: Reset msgSerial if connectionId changed (new connection) + prev_connection_id = self.connection_id + connection_id_changed = prev_connection_id is not None and prev_connection_id != connection_id + + if connection_id_changed: + log.info('ConnectionManager.on_connected(): New connectionId; resetting msgSerial') + self.msg_serial = 0 + # Note: In JS they call resetSendAttempted() here, but we don't need it + # because we fail all pending messages on disconnect per RTN7e + self.__connection_details = connection_details self.connection_id = connection_id @@ -244,7 +410,36 @@ def on_heartbeat(self, id: str | None) -> None: self.__ping_future.set_result(None) self.__ping_future = None + def on_ack(self, serial: int, count: int) -> None: + """Handle ACK protocol message from server + + Args: + serial: The msgSerial of the first message being acknowledged + count: The number of messages being acknowledged + """ + log.debug(f'ConnectionManager.on_ack(): serial={serial}, count={count}') + self.pending_message_queue.complete_messages(serial, count) + + def on_nack(self, serial: int, count: int, err: AblyException | None) -> None: + """Handle NACK protocol message from server + + Args: + serial: The msgSerial of the first message being rejected + count: The number of messages being rejected + err: Error information from the server + """ + if not err: + err = AblyException('Unable to send message; channel not responding', 50001, 500) + + log.error(f'ConnectionManager.on_nack(): serial={serial}, count={count}, err={err}') + self.pending_message_queue.complete_messages(serial, count, err) + def deactivate_transport(self, reason: AblyException | None = None): + # RTN19a: Before disconnecting, requeue any pending messages + # so they'll be resent on reconnection + if self.transport: + log.info('ConnectionManager.deactivate_transport(): requeuing pending messages') + self.requeue_pending_messages() self.transport = None self.notify_state(ConnectionState.DISCONNECTED, reason) @@ -383,8 +578,16 @@ def notify_state(self, state: ConnectionState, reason: AblyException | None = No ConnectionState.SUSPENDED, ConnectionState.FAILED, ): + # RTN7e: Fail pending messages on SUSPENDED, CLOSED, FAILED self.fail_queued_messages(reason) self.ably.channels._propagate_connection_interruption(state, reason) + elif state == ConnectionState.DISCONNECTED and not self.options.queue_messages: + # RTN7d: If queueMessages is false, fail pending messages on DISCONNECTED + log.info( + 'ConnectionManager.notify_state(): queueMessages is false; ' + 'failing pending messages on DISCONNECTED' + ) + self.fail_queued_messages(reason) def start_transition_timer(self, state: ConnectionState, fail_state: ConnectionState | None = None) -> None: log.debug(f'ConnectionManager.start_transition_timer(): transition state = {state}') @@ -466,6 +669,8 @@ def cancel_retry_timer(self) -> None: def disconnect_transport(self) -> None: log.info('ConnectionManager.disconnect_transport()') if self.transport: + # RTN19a: Requeue pending messages before disposing transport + self.requeue_pending_messages() self.disconnect_transport_task = asyncio.create_task(self.transport.dispose()) async def on_auth_updated(self, token_details: TokenDetails): diff --git a/ably/realtime/realtime_channel.py b/ably/realtime/realtime_channel.py index a6d42277..51ffc8a1 100644 --- a/ably/realtime/realtime_channel.py +++ b/ably/realtime/realtime_channel.py @@ -13,8 +13,8 @@ from ably.types.message import Message from ably.types.mixins import DecodingContext from ably.util.eventemitter import EventEmitter -from ably.util.exceptions import AblyException -from ably.util.helper import Timer, is_callable_or_coroutine +from ably.util.exceptions import AblyException, IncompatibleClientIdException +from ably.util.helper import Timer, is_callable_or_coroutine, validate_message_size if TYPE_CHECKING: from ably.realtime.realtime import AblyRealtime @@ -384,6 +384,138 @@ def unsubscribe(self, *args) -> None: # RTL8a self.__message_emitter.off(listener) + # RTL6 + async def publish(self, *args, **kwargs) -> None: + """Publish a message or messages on this channel + + Publishes a single message or an array of messages to the channel. + + Parameters + ---------- + *args: name and data, or message object(s) + Either: + - name (str) and data (any): publish a single message + - message (Message or dict): publish a single message object + - messages (list): publish multiple message objects + + Raises + ------ + AblyException + If the channel or connection state prevents publishing, + if clientId validation fails, or if message size exceeds limits + ValueError + If invalid arguments are provided + """ + messages = [] + + # RTL6i: Parse arguments - expect Message object, array of Messages, or name and data + if len(args) == 1: + if isinstance(args[0], Message): + # Single Message object + messages = [args[0]] + elif isinstance(args[0], dict): + # Message as dict + messages = [Message(**args[0])] + elif isinstance(args[0], list): + # RTL6i2: Array of Message objects + messages = [] + for msg in args[0]: + if isinstance(msg, Message): + messages.append(msg) + elif isinstance(msg, dict): + messages.append(Message(**msg)) + else: + raise ValueError("Array must contain Message objects or dicts") + else: + raise ValueError( + "The single-argument form of publish() expects a message object or an array of message objects" + ) + elif len(args) == 2: + # RTL6i1: name and data form + # RTL6i3: Allow name and/or data to be None + name = args[0] + data = args[1] + messages = [Message(name=name, data=data)] + else: + raise ValueError("publish() expects either (name, data) or a message object or array of messages") + + # RTL6g: Validate clientId for identified clients + if self.ably.auth.client_id: + for m in messages: + # RTL6g3: Reject messages with different clientId + if m.client_id == '*': + raise IncompatibleClientIdException( + 'Wildcard client_id is reserved and cannot be used when publishing messages', + 400, 40012) + elif m.client_id is not None and not self.ably.auth.can_assume_client_id(m.client_id): + raise IncompatibleClientIdException( + f'Cannot publish with client_id \'{m.client_id}\' as it is incompatible with the ' + f'current configured client_id \'{self.ably.auth.client_id}\'', + 400, 40012) + + + # Encode messages (RTL6a: same encoding as RestChannel#publish) + encoded_messages = [] + for m in messages: + # Encode the message with encryption if needed + if self.cipher: + m.encrypt(self.cipher) + + # Convert to dict representation + msg_dict = m.as_dict(binary=self.ably.options.use_binary_protocol) + encoded_messages.append(msg_dict) + + # RSL1i: Check message size limit + max_message_size = getattr(self.ably.options, 'max_message_size', 65536) # 64KB default + validate_message_size(encoded_messages, self.ably.options.use_binary_protocol, max_message_size) + + # RTL6c: Check connection and channel state + self._throw_if_unpublishable_state() + + log.info( + f'RealtimeChannel.publish(): sending message; ' + f'channel = {self.name}, state = {self.state}, message count = {len(encoded_messages)}' + ) + + # Send protocol message + protocol_message = { + "action": ProtocolMessageAction.MESSAGE, + "channel": self.name, + "messages": encoded_messages, + } + + # RTL6b: Await acknowledgment from server + await self.__realtime.connection.connection_manager.send_protocol_message(protocol_message) + + def _throw_if_unpublishable_state(self) -> None: + """Check if the channel and connection are in a state that allows publishing + + Raises + ------ + AblyException + If the channel or connection state prevents publishing + """ + # RTL6c4: Check connection state + connection_state = self.__realtime.connection.state + if connection_state not in [ + ConnectionState.CONNECTED, + ConnectionState.CONNECTING, + ConnectionState.DISCONNECTED, + ]: + raise AblyException( + f"Cannot publish message; connection state is {connection_state}", + 400, + 40001, + ) + + # RTL6c4: Check channel state + if self.state in [ChannelState.SUSPENDED, ChannelState.FAILED]: + raise AblyException( + f"Cannot publish message; channel state is {self.state}", + 400, + 90001, + ) + def _on_message(self, proto_msg: dict) -> None: action = proto_msg.get('action') # RTL4c1 diff --git a/ably/transport/websockettransport.py b/ably/transport/websockettransport.py index 140b9d25..e1b93b09 100644 --- a/ably/transport/websockettransport.py +++ b/ably/transport/websockettransport.py @@ -33,7 +33,11 @@ class ProtocolMessageAction(IntEnum): HEARTBEAT = 0 + ACK = 1 + NACK = 2 + CONNECT = 3 CONNECTED = 4 + DISCONNECT = 5 DISCONNECTED = 6 CLOSE = 7 CLOSED = 8 @@ -42,8 +46,14 @@ class ProtocolMessageAction(IntEnum): ATTACHED = 11 DETACH = 12 DETACHED = 13 + PRESENCE = 14 MESSAGE = 15 + SYNC = 16 AUTH = 17 + ACTIVATE = 18 + OBJECT = 19 + OBJECT_SYNC = 20 + ANNOTATION = 21 class WebSocketTransport(EventEmitter): @@ -155,6 +165,18 @@ async def on_protocol_message(self, msg): elif action == ProtocolMessageAction.HEARTBEAT: id = msg.get('id') self.connection_manager.on_heartbeat(id) + elif action == ProtocolMessageAction.ACK: + # Handle acknowledgment of sent messages + msg_serial = msg.get('msgSerial', 0) + count = msg.get('count', 1) + self.connection_manager.on_ack(msg_serial, count) + elif action == ProtocolMessageAction.NACK: + # Handle negative acknowledgment (error sending messages) + msg_serial = msg.get('msgSerial', 0) + count = msg.get('count', 1) + error = msg.get('error') + exception = AblyException.from_dict(error) if error else None + self.connection_manager.on_nack(msg_serial, count, exception) elif action in ( ProtocolMessageAction.ATTACHED, ProtocolMessageAction.DETACHED, diff --git a/ably/util/helper.py b/ably/util/helper.py index f69a0146..53226f27 100644 --- a/ably/util/helper.py +++ b/ably/util/helper.py @@ -1,11 +1,16 @@ import asyncio import inspect +import json import random import string import time from typing import Callable, Dict, Tuple from urllib.parse import parse_qs, urlparse +import msgpack + +from ably.util.exceptions import AblyException + def get_random_id(): # get random string of letters and digits @@ -69,3 +74,27 @@ async def _job(self): def cancel(self): self._task.cancel() + +def validate_message_size(encoded_messages: list, use_binary_protocol: bool, max_message_size: int) -> None: + """Validate that encoded messages don't exceed the maximum size limit. + + Args: + encoded_messages: List of encoded message dictionaries + use_binary_protocol: Whether to use binary (msgpack) or JSON encoding + max_message_size: Maximum allowed size in bytes + + Raises: + AblyException: If the encoded messages exceed the maximum size + """ + if use_binary_protocol: + size = len(msgpack.packb(encoded_messages, use_bin_type=True)) + else: + size = len(json.dumps(encoded_messages, separators=(',', ':')).encode('utf-8')) + + if size > max_message_size: + raise AblyException( + f"Maximum size of messages that can be published at once exceeded " + f"(was {size} bytes; limit is {max_message_size} bytes)", + 400, + 40009, + ) diff --git a/test/ably/realtime/realtimechannel_publish_test.py b/test/ably/realtime/realtimechannel_publish_test.py new file mode 100644 index 00000000..539f55bb --- /dev/null +++ b/test/ably/realtime/realtimechannel_publish_test.py @@ -0,0 +1,976 @@ +import asyncio + +import pytest + +from ably.realtime.connection import ConnectionState +from ably.realtime.realtime_channel import ChannelState +from ably.transport.websockettransport import ProtocolMessageAction +from ably.types.message import Message +from ably.util.exceptions import AblyException, IncompatibleClientIdException +from test.ably.testapp import TestApp +from test.ably.utils import BaseAsyncTestCase, WaitableEvent, assert_waiter + + +class TestRealtimeChannelPublish(BaseAsyncTestCase): + """Tests for RTN7 spec - Message acknowledgment""" + + async def asyncSetUp(self): + self.test_vars = await TestApp.get_test_vars() + + # RTN7a - Basic ACK/NACK functionality + async def test_publish_returns_ack_on_success(self): + """RTN7a: Verify that publish awaits ACK from server""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_ack_channel') + await channel.attach() + + # Publish should complete successfully when ACK is received + await channel.publish('test_event', 'test_data') + + await ably.close() + + async def test_publish_raises_on_nack(self): + """RTN7a: Verify that publish raises exception when NACK is received""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_nack_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Intercept transport send to simulate NACK + original_send = connection_manager.transport.send + + async def send_and_nack(message): + await original_send(message) + # Simulate NACK from server + if message.get('action') == ProtocolMessageAction.MESSAGE: + msg_serial = message.get('msgSerial', 0) + nack_message = { + 'action': ProtocolMessageAction.NACK, + 'msgSerial': msg_serial, + 'count': 1, + 'error': { + 'message': 'Test NACK error', + 'statusCode': 400, + 'code': 40000 + } + } + await connection_manager.transport.on_protocol_message(nack_message) + + connection_manager.transport.send = send_and_nack + + # Publish should raise exception when NACK is received + with pytest.raises(AblyException) as exc_info: + await channel.publish('test_event', 'test_data') + + assert 'Test NACK error' in str(exc_info.value) + assert exc_info.value.code == 40000 + + await ably.close() + + # RTN7b - msgSerial incrementing + async def test_msgserial_increments_sequentially(self): + """RTN7b: Verify that msgSerial increments for each message""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_msgserial_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + sent_serials = [] + + # Intercept messages to capture msgSerial values + original_send = connection_manager.transport.send + + async def capture_serial(message): + if message.get('action') == ProtocolMessageAction.MESSAGE: + sent_serials.append(message.get('msgSerial')) + await original_send(message) + + connection_manager.transport.send = capture_serial + + # Publish multiple messages + await channel.publish('event1', 'data1') + await channel.publish('event2', 'data2') + await channel.publish('event3', 'data3') + + # Verify msgSerial increments: 0, 1, 2 + assert sent_serials == [0, 1, 2], f"Expected [0, 1, 2], got {sent_serials}" + + await ably.close() + + # RTN7e - Fail pending messages on SUSPENDED, CLOSED, FAILED + async def test_pending_messages_fail_on_suspended(self): + """RTN7e: Verify pending messages fail when connection enters SUSPENDED state""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_suspended_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Block ACKs to keep message pending + original_send = connection_manager.transport.send + blocked_messages = [] + + async def block_acks(message): + if message.get('action') == ProtocolMessageAction.MESSAGE: + blocked_messages.append(message) + # Don't actually send - keep it pending + return + await original_send(message) + + connection_manager.transport.send = block_acks + + # Start publish but don't await (it will hang waiting for ACK) + publish_task = asyncio.create_task(channel.publish('test_event', 'test_data')) + + # Wait for message to be pending + async def check_pending(): + return connection_manager.pending_message_queue.count() > 0 + await assert_waiter(check_pending, timeout=2) + + # Force connection to SUSPENDED state + connection_manager.notify_state( + ConnectionState.SUSPENDED, + AblyException('Test suspension', 400, 80002) + ) + + # The publish should now complete with an exception + with pytest.raises(AblyException) as exc_info: + await publish_task + + assert 'Test suspension' in str(exc_info.value) or exc_info.value.code == 80002 + + await ably.close() + + async def test_pending_messages_fail_on_failed(self): + """RTN7e: Verify pending messages fail when connection enters FAILED state""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_failed_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Block ACKs + original_send = connection_manager.transport.send + + async def block_acks(message): + if message.get('action') == ProtocolMessageAction.MESSAGE: + return # Don't send + await original_send(message) + + connection_manager.transport.send = block_acks + + # Start publish + publish_task = asyncio.create_task(channel.publish('test_event', 'test_data')) + + # Wait for message to be pending + async def check_pending(): + return connection_manager.pending_message_queue.count() > 0 + await assert_waiter(check_pending, timeout=2) + + # Force FAILED state + connection_manager.notify_state( + ConnectionState.FAILED, + AblyException('Test failure', 80000, 500) + ) + + # Should raise exception + with pytest.raises(AblyException): + await publish_task + + await ably.close() + + # RTN7d - Fail on DISCONNECTED when queueMessages=false + async def test_fail_on_disconnected_when_queue_messages_false(self): + """RTN7d: Verify pending messages fail on DISCONNECTED if queueMessages is false""" + # Create client with queueMessages=False + ably = await TestApp.get_ably_realtime(queue_messages=False) + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_disconnected_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Block ACKs + original_send = connection_manager.transport.send + + async def block_acks(message): + if message.get('action') == ProtocolMessageAction.MESSAGE: + return + await original_send(message) + + connection_manager.transport.send = block_acks + + # Start publish + publish_task = asyncio.create_task(channel.publish('test_event', 'test_data')) + + # Wait for message to be pending + async def check_pending(): + return connection_manager.pending_message_queue.count() > 0 + await assert_waiter(check_pending, timeout=2) + + # Force DISCONNECTED state + connection_manager.notify_state( + ConnectionState.DISCONNECTED, + AblyException('Test disconnect', 400, 80003) + ) + + # Should raise exception because queueMessages is false + with pytest.raises(AblyException): + await publish_task + + await ably.close() + + async def test_queue_on_disconnected_when_queue_messages_true(self): + """RTN7d: Verify messages are queued (not failed) on DISCONNECTED when queueMessages is true""" + # Create client with queueMessages=True (default) + ably = await TestApp.get_ably_realtime(queue_messages=True) + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_queue_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Block ACKs + original_send = connection_manager.transport.send + + async def block_acks(message): + if message.get('action') == ProtocolMessageAction.MESSAGE: + return + await original_send(message) + + connection_manager.transport.send = block_acks + + # Start publish (will be pending) + publish_task = asyncio.create_task(channel.publish('test_event', 'test_data')) + + # Wait for message to be pending + async def check_pending(): + return connection_manager.pending_message_queue.count() > 0 + await assert_waiter(check_pending, timeout=2) + + # Force DISCONNECTED state + connection_manager.notify_state(ConnectionState.DISCONNECTED, None) + + # Give time for state transition + async def check_disconnected(): + return connection_manager.state != ConnectionState.CONNECTED + await assert_waiter(check_disconnected, timeout=2) + + # Task should still be pending (not failed) because queueMessages=True + assert not publish_task.done(), "Publish should still be pending when queueMessages=True" + + # Message should still be in pending queue OR moved to queued_messages + assert connection_manager.pending_message_queue.count() + len(connection_manager.queued_messages) > 0 + + # Now restore connection would normally complete the publish + # For this test, we'll just cancel it + publish_task.cancel() + + await ably.close() + + # RTN19a2 - Reset msgSerial on new connectionId + async def test_msgserial_resets_on_new_connection_id(self): + """RTN19a2: Verify msgSerial resets to 0 when connectionId changes""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_reset_serial_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Publish a message to increment msgSerial + await channel.publish('event1', 'data1') + + # msgSerial should now be 1 + assert connection_manager.msg_serial == 1, f"Expected msgSerial=1, got {connection_manager.msg_serial}" + + # Simulate new connection with different connectionId + new_connection_id = 'new_connection_id_12345' + + # Simulate server sending CONNECTED with new connectionId + from ably.types.connectiondetails import ConnectionDetails + new_connection_details = ConnectionDetails( + connection_state_ttl=120000, + max_idle_interval=15000, + connection_key='new_key', + client_id=None + ) + + connection_manager.on_connected(new_connection_details, new_connection_id) + + # msgSerial should be reset to 0 + assert connection_manager.msg_serial == 0, ( + f"Expected msgSerial=0 after new connection, got {connection_manager.msg_serial}" + ) + + await ably.close() + + async def test_msgserial_not_reset_on_same_connection_id(self): + """RTN19a2: Verify msgSerial is NOT reset when connectionId stays the same""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_same_connection_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Publish messages to increment msgSerial + await channel.publish('event1', 'data1') + await channel.publish('event2', 'data2') + + # msgSerial should be 2 + assert connection_manager.msg_serial == 2 + + # Simulate reconnection with SAME connectionId (transport change, not new connection) + same_connection_id = connection_manager.connection_id + + from ably.types.connectiondetails import ConnectionDetails + connection_details = ConnectionDetails( + connection_state_ttl=120000, + max_idle_interval=15000, + connection_key='different_key', # Key can change + client_id=None + ) + + connection_manager.on_connected(connection_details, same_connection_id) + + # msgSerial should NOT be reset (stays at 2) + assert connection_manager.msg_serial == 2, ( + f"Expected msgSerial=2 (unchanged), got {connection_manager.msg_serial}" + ) + + await ably.close() + + # Test that multiple messages get correct msgSerial values + async def test_multiple_messages_concurrent(self): + """RTN7b: Test that multiple concurrent publishes get sequential msgSerials""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_concurrent_channel') + await channel.attach() + + # Publish multiple messages concurrently + tasks = [ + channel.publish('event', f'data{i}') + for i in range(5) + ] + + # All should complete successfully + await asyncio.gather(*tasks) + + # msgSerial should have incremented to 5 + assert ably.connection.connection_manager.msg_serial == 5 + + await ably.close() + + # RTN19a - Resend messages awaiting ACK on reconnect + async def test_pending_messages_resent_on_reconnect(self): + """RTN19a: Verify messages awaiting ACK are resent when transport reconnects""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_resend_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Block ACKs from being processed + original_on_ack = connection_manager.on_ack + connection_manager.on_ack = lambda *args: None + + # Publish a message + publish_future = asyncio.create_task(connection_manager.send_protocol_message({ + "action": ProtocolMessageAction.MESSAGE, + "channel": channel.name, + "messages": [{"name": "test", "data": "data"}] + })) + + # Wait for message to be pending + async def check_pending(): + return connection_manager.pending_message_queue.count() == 1 + await assert_waiter(check_pending, timeout=2) + + # Verify msgSerial was assigned + pending_msg = list(connection_manager.pending_message_queue.messages)[0] + assert pending_msg.message.get('msgSerial') == 0 + + # Simulate requeueing (what happens on disconnect) + connection_manager.requeue_pending_messages() + + # Pending queue should now be empty (messages moved to queued_messages) + assert connection_manager.pending_message_queue.count() == 0 + assert len(connection_manager.queued_messages) == 1 + + # Verify the PendingMessage object is in the queue (preserves Future) + queued_msg = connection_manager.queued_messages.pop() + assert queued_msg.message.get('msgSerial') == 0, "msgSerial should be preserved" + + # Add back to pending queue to simulate resend + connection_manager.pending_message_queue.push(queued_msg) + + # Restore on_ack and simulate ACK from server + connection_manager.on_ack = original_on_ack + connection_manager.on_ack(0, 1) + + # Future should be resolved + result = await asyncio.wait_for(publish_future, timeout=1) + assert result is None + + await ably.close() + + async def test_msgserial_preserved_on_resume(self): + """RTN19a2: Verify msgSerial counter is preserved when resuming (same connectionId)""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_preserve_serial_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + original_connection_id = connection_manager.connection_id + + # Block ACKs to keep messages pending + original_on_ack = connection_manager.on_ack + connection_manager.on_ack = lambda *args: None + + # Publish a message (msgSerial will be 0) + asyncio.create_task(connection_manager.send_protocol_message({ + "action": ProtocolMessageAction.MESSAGE, + "channel": channel.name, + "messages": [{"name": "test1", "data": "data1"}] + })) + + # Wait for message to be pending + async def check_pending(): + return connection_manager.pending_message_queue.count() == 1 + await assert_waiter(check_pending, timeout=2) + + # msgSerial counter should be at 1 now + assert connection_manager.msg_serial == 1 + + # Simulate resume with SAME connectionId + from ably.types.connectiondetails import ConnectionDetails + connection_details = ConnectionDetails( + connection_state_ttl=120000, + max_idle_interval=15000, + connection_key='same_key', + client_id=None + ) + connection_manager.on_connected(connection_details, original_connection_id) + + # msgSerial counter should STILL be 1 (preserved on resume) + assert connection_manager.msg_serial == 1, ( + f"Expected msgSerial=1 preserved, got {connection_manager.msg_serial}" + ) + + # Restore on_ack and clean up + connection_manager.on_ack = original_on_ack + connection_manager.pending_message_queue.complete_all_messages(AblyException("cleanup", 0, 0)) + + await ably.close() + + async def test_msgserial_reset_on_failed_resume(self): + """RTN19a2: Verify msgSerial counter is reset when resume fails (new connectionId)""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_reset_serial_resume_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Block ACKs to keep messages pending + original_on_ack = connection_manager.on_ack + connection_manager.on_ack = lambda *args: None + + # Publish a message (msgSerial will be 0) + asyncio.create_task(connection_manager.send_protocol_message({ + "action": ProtocolMessageAction.MESSAGE, + "channel": channel.name, + "messages": [{"name": "test1", "data": "data1"}] + })) + + # Wait for message to be pending + async def check_pending(): + return connection_manager.pending_message_queue.count() == 1 + await assert_waiter(check_pending, timeout=2) + + # msgSerial counter should be at 1 now + assert connection_manager.msg_serial == 1 + + # Simulate NEW connection (different connectionId = failed resume) + from ably.types.connectiondetails import ConnectionDetails + new_connection_details = ConnectionDetails( + connection_state_ttl=120000, + max_idle_interval=15000, + connection_key='new_key', + client_id=None + ) + new_connection_id = 'new_connection_id_67890' + connection_manager.on_connected(new_connection_details, new_connection_id) + + # msgSerial counter should be reset to 0 (new connection) + assert connection_manager.msg_serial == 0, ( + f"Expected msgSerial reset to 0, got {connection_manager.msg_serial}" + ) + + # Restore on_ack and clean up + connection_manager.on_ack = original_on_ack + connection_manager.pending_message_queue.complete_all_messages(AblyException("cleanup", 0, 0)) + + await ably.close() + + # Test ACK with count > 1 + async def test_ack_with_multiple_count(self): + """RTN7a/RTN7b: Test that ACK with count > 1 completes multiple messages""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_multi_ack_channel') + await channel.attach() + + connection_manager = ably.connection.connection_manager + + # Intercept transport to delay ACKs + original_send = connection_manager.transport.send + pending_messages = [] + + async def delay_ack(message): + if message.get('action') == ProtocolMessageAction.MESSAGE: + pending_messages.append(message) + # Don't send yet + return + await original_send(message) + + connection_manager.transport.send = delay_ack + + # Start 3 publishes + task1 = asyncio.create_task(channel.publish('event1', 'data1')) + task2 = asyncio.create_task(channel.publish('event2', 'data2')) + task3 = asyncio.create_task(channel.publish('event3', 'data3')) + + # Wait for message to be pending + async def check_pending(): + return connection_manager.pending_message_queue.count() == 3 + await assert_waiter(check_pending, timeout=2) + + # Send ACK for all 3 messages at once (count=3) + ack_message = { + 'action': ProtocolMessageAction.ACK, + 'msgSerial': 0, # First message serial + 'count': 3 # Acknowledging 3 messages + } + await connection_manager.transport.on_protocol_message(ack_message) + + # All tasks should now complete + await task1 + await task2 + await task3 + + await ably.close() + + async def test_queued_messages_sent_before_channel_reattach(self): + """RTL3d + RTL6c2: Verify queued messages are sent immediately on reconnection, + without waiting for channel reattachment to complete""" + ably = await TestApp.get_ably_realtime(queue_messages=True) + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_rtl3d_rtl6c2_channel') + await channel.attach() + + # Verify channel is ATTACHED + assert channel.state == ChannelState.ATTACHED + + connection_manager = ably.connection.connection_manager + + # Track channel reattachment + channel_attaching_seen = False + + def track_attaching(state_change): + nonlocal channel_attaching_seen + if state_change.current == ChannelState.ATTACHING: + channel_attaching_seen = True + + channel.on('attaching', track_attaching) + + # Force an invalid resume to ensure a new connection + # (like test_attached_channel_reattaches_on_invalid_resume) + assert connection_manager.connection_details + connection_manager.connection_details.connection_key = 'ably-python-fake-key' + + # Queue a message before disconnecting (to ensure it gets queued) + # Block message sending first + original_send = connection_manager.transport.send + + async def block_messages(message): + if message.get('action') == ProtocolMessageAction.MESSAGE: + # Don't send MESSAGE, just queue it + return + await original_send(message) + + connection_manager.transport.send = block_messages + + # Publish a message (will be blocked and moved to pending) + publish_task = asyncio.create_task(channel.publish('test_event', 'test_data')) + + # Wait for message to be pending + async def check_pending(): + return connection_manager.pending_message_queue.count() > 0 + await assert_waiter(check_pending, timeout=2) + + # Now disconnect to move pending messages to queued + assert connection_manager.transport + await connection_manager.transport.dispose() + connection_manager.notify_state(ConnectionState.DISCONNECTED, retry_immediately=False) + + # Give time for state transition and message requeueing + async def check_requeue_happened(): + return len(connection_manager.queued_messages) > 0 + await assert_waiter(check_requeue_happened, timeout=2) + + # Verify message was moved to queued_messages + queued_count_before = len(connection_manager.queued_messages) + assert queued_count_before > 0, "Message should be queued after DISCONNECTED" + assert not publish_task.done(), "Publish task should still be pending" + + # Reconnect (will fail resume due to fake key, creating new connection) + ably.connect() + + # Wait for CONNECTED state (RTL3d + RTL6c2 happens here) + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=10) + + # Give time for send_queued_messages() and channel reattachment to process + async def check_sent_queued_messages(): + return len(connection_manager.queued_messages) == 0 + await assert_waiter(check_sent_queued_messages, timeout=2) + + # Verify queued messages were sent (RTL6c2) + queued_count_after = len(connection_manager.queued_messages) + assert queued_count_after < queued_count_before, \ + "Queued messages should be sent immediately when entering CONNECTED (RTL6c2)" + + # Verify channel transitioned to ATTACHING (RTL3d) + assert channel_attaching_seen, "Channel should have transitioned to ATTACHING (RTL3d)" + + # Wait for channel to reach ATTACHED state + if channel.state != ChannelState.ATTACHED: + await asyncio.wait_for(channel.once_async(ChannelState.ATTACHED), timeout=5) + + # Verify publish completes successfully + await asyncio.wait_for(publish_task, timeout=5) + + await ably.close() + + # RSL1i - Message size limit tests + async def test_publish_message_exceeding_size_limit(self): + """RSL1i: Verify that publishing a message exceeding the size limit raises an exception""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_size_limit_channel') + await channel.attach() + + # Create a message that exceeds the default 65536 byte limit + # 70KB of data should definitely exceed the limit + large_data = 'x' * (70 * 1024) + + # Attempt to publish should raise AblyException with code 40009 + with pytest.raises(AblyException) as exc_info: + await channel.publish('test_event', large_data) + + assert exc_info.value.code == 40009 + assert 'Maximum size of messages' in str(exc_info.value) + + await ably.close() + + async def test_publish_message_within_size_limit(self): + """RSL1i: Verify that publishing a message within the size limit succeeds""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_size_ok_channel') + await channel.attach() + + # Create a message that is well within the 65536 byte limit + # 10KB of data should be safe + medium_data = 'x' * (10 * 1024) + + # Publish should complete successfully + await channel.publish('test_event', medium_data) + + await ably.close() + + # RTL6g - Client ID validation tests + async def test_publish_with_matching_client_id(self): + """RTL6g2: Verify that publishing with explicit matching clientId succeeds""" + ably = await TestApp.get_ably_realtime(client_id='test_client_123') + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_client_id_channel') + await channel.attach() + + # Create message with matching clientId + message = Message(name='test_event', data='test_data', client_id='test_client_123') + + # Publish should succeed with matching clientId + await channel.publish(message) + + await ably.close() + + async def test_publish_with_null_client_id_when_identified(self): + """RTL6g1: Verify that publishing with null clientId gets populated by server when client is identified""" + ably = await TestApp.get_ably_realtime(client_id='test_client_456') + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_null_client_id_channel') + await channel.attach() + + # Publish without explicit clientId (will be populated by server) + await channel.publish('test_event', 'test_data') + + await ably.close() + + async def test_publish_with_mismatched_client_id_fails(self): + """RTL6g3: Verify that publishing with mismatched clientId is rejected""" + ably = await TestApp.get_ably_realtime(client_id='test_client_789') + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_mismatch_client_id_channel') + await channel.attach() + + # Create message with different clientId + message = Message(name='test_event', data='test_data', client_id='different_client') + + # Publish should raise IncompatibleClientIdException + with pytest.raises(IncompatibleClientIdException) as exc_info: + await channel.publish(message) + + assert exc_info.value.code == 40012 + assert 'incompatible' in str(exc_info.value).lower() + + await ably.close() + + async def test_publish_with_wildcard_client_id_fails(self): + """RTL6g3: Verify that publishing with wildcard clientId is rejected""" + ably = await TestApp.get_ably_realtime(client_id='test_client_wildcard') + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_wildcard_client_id_channel') + await channel.attach() + + # Create message with wildcard clientId + message = Message(name='test_event', data='test_data', client_id='*') + + # Publish should raise IncompatibleClientIdException + with pytest.raises(IncompatibleClientIdException) as exc_info: + await channel.publish(message) + + assert exc_info.value.code == 40012 + assert 'wildcard' in str(exc_info.value).lower() + + await ably.close() + + # RTL6i - Data type variation tests + async def test_publish_with_string_data(self): + """RTL6i: Verify that publishing with string data succeeds""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_string_data_channel') + await channel.attach() + + # Publish message with string data + await channel.publish('test_event', 'simple string data') + + await ably.close() + + async def test_publish_with_json_object_data(self): + """RTL6i: Verify that publishing with JSON object data succeeds""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_json_object_channel') + await channel.attach() + + # Publish message with JSON object data + json_data = { + 'key1': 'value1', + 'key2': 42, + 'key3': True, + 'nested': {'inner': 'data'} + } + await channel.publish('test_event', json_data) + + await ably.close() + + async def test_publish_with_json_array_data(self): + """RTL6i: Verify that publishing with JSON array data succeeds""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_json_array_channel') + await channel.attach() + + # Publish message with JSON array data + array_data = ['item1', 'item2', 42, True, {'nested': 'object'}] + await channel.publish('test_event', array_data) + + await ably.close() + + async def test_publish_with_null_data(self): + """RTL6i3: Verify that publishing with null data succeeds""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_null_data_channel') + await channel.attach() + + # Publish message with null data (RTL6i3: null data is permitted) + await channel.publish('test_event', None) + + await ably.close() + + async def test_publish_with_null_name(self): + """RTL6i3: Verify that publishing with null name succeeds""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_null_name_channel') + await channel.attach() + + # Publish message with null name (RTL6i3: null name is permitted) + await channel.publish(None, 'test data') + + await ably.close() + + async def test_publish_message_array(self): + """RTL6i2: Verify that publishing an array of messages succeeds""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_message_array_channel') + await channel.attach() + + # Publish array of messages (RTL6i2) + messages = [ + Message(name='event1', data='data1'), + Message(name='event2', data='data2'), + Message(name='event3', data={'key': 'value'}), + ] + await channel.publish(messages) + + await ably.close() + + # RTL6c4 - Channel state validation tests + async def test_publish_fails_on_suspended_channel(self): + """RTL6c4: Verify that publishing on a SUSPENDED channel fails""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_suspended_channel') + await channel.attach() + + # Force channel to SUSPENDED state + channel._notify_state(ChannelState.SUSPENDED) + + # Verify channel is SUSPENDED + assert channel.state == ChannelState.SUSPENDED + + # Attempt to publish should raise AblyException with code 90001 + with pytest.raises(AblyException) as exc_info: + await channel.publish('test_event', 'test_data') + + assert exc_info.value.code == 90001 + assert 'suspended' in str(exc_info.value).lower() + + await ably.close() + + async def test_publish_fails_on_failed_channel(self): + """RTL6c4: Verify that publishing on a FAILED channel fails""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_failed_channel') + await channel.attach() + + # Force channel to FAILED state + channel._notify_state(ChannelState.FAILED) + + # Verify channel is FAILED + assert channel.state == ChannelState.FAILED + + # Attempt to publish should raise AblyException with code 90001 + with pytest.raises(AblyException) as exc_info: + await channel.publish('test_event', 'test_data') + + assert exc_info.value.code == 90001 + assert 'failed' in str(exc_info.value).lower() + + await ably.close() + + # RSL1k - Idempotent publishing test + async def test_idempotent_realtime_publishing(self): + """RSL1k2, RSL1k5: Verify that messages with explicit IDs can be published for idempotent behavior""" + ably = await TestApp.get_ably_realtime() + await asyncio.wait_for(ably.connection.once_async(ConnectionState.CONNECTED), timeout=5) + + channel = ably.channels.get('test_idempotent_channel') + await channel.attach() + + idempotent_id = 'test-msg-id-12345' + different_id = 'test-msg-id-67890' + + data_received = [] + different_id_received = WaitableEvent() + def on_message(message): + try: + data_received.append(message.data) + + if message.id == different_id: + different_id_received.finish() + except Exception as e: + different_id_received.finish() + raise e + + await channel.subscribe(on_message) + + # RSL1k2: Publish messages with explicit IDs + # Messages with explicit IDs should include those IDs in the published message + message1 = Message(name='idempotent_event', data='first message', id=idempotent_id) + + # Publish should succeed with explicit ID + await channel.publish(message1) + + # Publish another message with the same ID (RSL1k5: idempotent publishing) + # With idempotent publishing enabled on the server, messages with the same ID + # should be deduplicated. Here we verify that publishing with the same ID succeeds. + message2 = Message(name='idempotent_event', data='second message', id=idempotent_id) + await channel.publish(message2) + + # Publish a message with a different ID + message3 = Message(name='unique_event', data='third message', id=different_id) + await channel.publish(message3) + + await different_id_received.wait() + + assert len(data_received) == 2, "Only two messages should have been received" + assert data_received[0] == 'first message' + assert data_received[1] == 'third message' + + await ably.close()