diff --git a/.github/ruff.toml b/.github/ruff.toml index 74f0fbb..e5dd6a9 100644 --- a/.github/ruff.toml +++ b/.github/ruff.toml @@ -8,7 +8,6 @@ exclude = [ "dist/", ".venv/", "__pycache__/", - "uprotocol/cloudevent/cloudevents_pb2.py", "uprotocol/core/*", "uprotocol/v1/*", "uprotocol/uoptions_pb2.py" diff --git a/README.adoc b/README.adoc index f84959f..d9c8f31 100644 --- a/README.adoc +++ b/README.adoc @@ -5,10 +5,10 @@ This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Python defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <> below. Each package contains a README.adoc file that describes the purpose of the package and how to use it. -The module contains the factory methods, serializers, and validators for all data types defined in the specifications, and any data models that either haven't or couldn't be defined in uprotocol-core-api yet (ex. UPayload) This library fits into the big picture of the uProtocol SDK as seen in <> below. +The module contains the factory methods, serializers, and validators for all data types defined in the specifications, and any data models that either haven't or couldn't be defined in uprotocol-core-api yet (ex. UPayload) This library fits into the big picture of the uProtocol SDK as seen in the diagram below. .uProtocol SDK -image:https://raw.githubusercontent.com/eclipse-uprotocol/uprotocol-spec/main/uprotocol_sdk.drawio.svg[#uprotocol-sdk,width=100%,align="center"] +image:https://raw.githubusercontent.com/eclipse-uprotocol/up-spec/main/up_libraries.drawio.svg[#uprotocol-sdk,width=100%,align="center"] == Getting Started @@ -21,7 +21,9 @@ Before proceeding with the setup of this project, ensure that the following prer ---- mvn -version ---- -If Maven is properly installed, you should see information about the Maven version and configuration. +If Maven is properly installed, you should see information about the Maven version and configuration. + + +NOTE: Ensure you are using Java 17 with your Maven installation before continuing with the next steps. Other versions of Java may not be supported. === Importing the Library @@ -31,7 +33,7 @@ To set up SDK, follow the steps below: + [source] ---- -git clone https://github.com/eclipse-uprotocol/uprotocol-python.git +git clone https://github.com/eclipse-uprotocol/up-python.git ---- . Execute the `pull_and_compile_protos.py` script using the following commands: @@ -44,7 +46,7 @@ python pull_and_compile_protos.py This script automates the following tasks: 1. **Cloning and Compilation of Protos:** - Clones the `up-core-api` protos from the specified repository URL, compiles them, and generates Python protofiles in the protos folder. + Clones the `up-core-api` protos from the `up-spec` repository, compiles them, and generates Python protofiles in the protos folder. . Install up-python + @@ -53,29 +55,12 @@ This script automates the following tasks: python -m pip install ../ ---- -*This will install the up-python, making its classes and modules available for import in your python code.* +*This will install up-python, making its classes and modules available for import in your python code.* === Using the Library The Library is broken up into different packages that are described in <> below. Each package contains a README.adoc file that describes the purpose of the package and how to use it. Packages are organized into the following directories: -.Package Folders -[#pkg-folders,width=100%,cols="20%,80%",options="header"] -|=== - -| Folder | Purpose - -| `*builder*` or `*factory*` -| Contains factory methods for creating uProtocol data types - -| `*serializer*` -| Contains serializers to convert the objects into byte or string form representation of said object - -| `*validator*` -| Contains validators to validate the data types and report errors if the objects are missing or incorrect - -|=== - .SDK Packages [#sdk-packages,width=100%,cols="20%,80%",options="header"] @@ -83,20 +68,17 @@ The Library is broken up into different packages that are described in < UUri: return self.source @@ -45,99 +45,63 @@ def get_source(self) -> UUri: def __init__(self, source=None): super().__init__() self.source = source if source else UUri(authority_name="Neelam", ue_id=4, ue_version_major=1) - self.listeners: Dict[str, List[UListener]] = {} + self.listeners: List[UListener] = [] self.lock = threading.Lock() - - def build_response(self, request: UMessage): - payload = UPayload.pack_from_data_and_format(request.payload, request.attributes.payload_format) - - return UMessageBuilder.response_for_request(request.attributes).build_from_upayload(payload) - - def close(self): - self.listeners.clear() - - async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: - with self.lock: - if sink_filter is not None: # method uri - topic = UriSerializer().serialize(sink_filter) - else: - topic = UriSerializer().serialize(source_filter) - - if topic not in self.listeners: - self.listeners[topic] = [] - self.listeners[topic].append(listener) - return UStatus(code=UCode.OK) - - async def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: - with self.lock: - if sink is not None: # method uri - topic = UriSerializer().serialize(sink) - else: - topic = UriSerializer().serialize(source) - - if topic in self.listeners and listener in self.listeners[topic]: - self.listeners[topic].remove(listener) - if not self.listeners[topic]: # If the list is empty, remove the key - del self.listeners[topic] - code = UCode.OK + self.executor = ThreadPoolExecutor() + + def build_response(self, request: UMessage) -> UMessage: + if request.attributes.sink.ue_id == 0: + if request.attributes.sink.resource_id == 1: + try: + subscription_request = SubscriptionRequest.parse(request.payload) + sub_response = SubscriptionResponse( + topic=subscription_request.topic, + status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBED), + ) + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack(sub_response) + ) + except Exception: + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack(UnsubscribeResponse()) + ) else: - code = UCode.INVALID_ARGUMENT - result = UStatus(code=code) - return result + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack(UnsubscribeResponse()) + ) + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack_from_data_and_format(request.payload, request.attributes.payload_format) + ) async def send(self, message: UMessage) -> UStatus: validator = UAttributesValidator.get_validator(message.attributes) - with self.lock: - if message is None or validator.validate(message.attributes) != ValidationResult.success(): - return UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid message attributes") - # Use a ThreadPoolExecutor with max_workers=1 - executor = ThreadPoolExecutor(max_workers=1) + if message is None or validator.validate(message.attributes) != ValidationResult.success(): + return UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid message attributes") - try: - # Submit _notify_listeners to the executor - future = executor.submit(self._notify_listeners, message) + if message.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: + response = self.build_response(message) + self._notify_listeners(response) - # Await completion of the Future - await asyncio.wrap_future(future) + return UStatus(code=UCode.OK) - finally: - # Clean up the executor - executor.shutdown() + def _notify_listeners(self, response: UMessage): + for listener in self.listeners: + listener.on_receive(response) + async def register_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: + self.listeners.append(listener) return UStatus(code=UCode.OK) - def _notify_listeners(self, umsg): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_PUBLISH: - for key, listeners in self.listeners.items(): - uri = UriSerializer().deserialize(key) - if not (UriValidator.is_rpc_method(uri) or UriValidator.is_rpc_response(uri)): - for listener in listeners: - loop.call_soon_threadsafe(listener.on_receive, umsg) + async def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: + if listener in self.listeners: + self.listeners.remove(listener) + return UStatus(code=UCode.OK) + return UStatus(code=UCode.NOT_FOUND) - else: - if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: - serialized_uri = UriSerializer().serialize(umsg.attributes.sink) - if serialized_uri not in self.listeners: - # no listener registered for method uri, send dummy response. - # This case will only come for request type - # as for response type, there will always be response handler as it is in up client - serialized_uri = UriSerializer().serialize(UriFactory.ANY) - umsg = self.build_response(umsg) - else: - # this is for response type,handle response - serialized_uri = UriSerializer().serialize(UriFactory.ANY) - - if serialized_uri in self.listeners: - for listener in self.listeners[serialized_uri]: - loop.call_soon_threadsafe(listener.on_receive, umsg) - break # as there will be only one listener for method uri - finally: - loop.run_until_complete(loop.shutdown_asyncgens()) - loop.close() + def close(self): + self.listeners.clear() + self.executor.shutdown() class TimeoutUTransport(MockUTransport, ABC): diff --git a/tests/test_communication/test_inmemoryrpcclient.py b/tests/test_communication/test_inmemoryrpcclient.py index a8836df..6703608 100644 --- a/tests/test_communication/test_inmemoryrpcclient.py +++ b/tests/test_communication/test_inmemoryrpcclient.py @@ -19,6 +19,7 @@ from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.upayload import UPayload from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.transport.utransport import UTransport from uprotocol.v1.uattributes_pb2 import UPriority from uprotocol.v1.ucode_pb2 import UCode from uprotocol.v1.uri_pb2 import UUri @@ -30,6 +31,16 @@ class TestInMemoryRpcClient(unittest.IsolatedAsyncioTestCase): def create_method_uri(): return UUri(authority_name="neelam", ue_id=10, ue_version_major=1, resource_id=3) + def test_constructor_transport_none(self): + with self.assertRaises(ValueError) as context: + InMemoryRpcClient(None) + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NULL_ERROR) + + def test_constructor_transport_not_instance(self): + with self.assertRaises(ValueError) as context: + InMemoryRpcClient("Invalid Transport") + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + async def test_invoke_method_with_payload(self): payload = UPayload.pack_to_any(UUri()) rpc_client = InMemoryRpcClient(MockUTransport()) diff --git a/tests/test_communication/test_inmemoryrpcserver.py b/tests/test_communication/test_inmemoryrpcserver.py index 96354fd..4be86e7 100644 --- a/tests/test_communication/test_inmemoryrpcserver.py +++ b/tests/test_communication/test_inmemoryrpcserver.py @@ -14,26 +14,28 @@ import copy import unittest -from unittest.mock import MagicMock +from typing import Dict +from unittest.mock import AsyncMock, MagicMock -from tests.test_communication.mock_utransport import ( - ErrorUTransport, - MockUTransport, -) +from tests.test_communication.mock_utransport import EchoUTransport from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer from uprotocol.communication.requesthandler import RequestHandler from uprotocol.communication.upayload import UPayload from uprotocol.communication.ustatuserror import UStatusError from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.ulistener import UListener from uprotocol.transport.utransport import UTransport from uprotocol.uri.serializer.uriserializer import UriSerializer from uprotocol.v1.ucode_pb2 import UCode -from uprotocol.v1.umessage_pb2 import UMessage from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus class TestInMemoryRpcServer(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.mock_transport = MagicMock(spec=UTransport) + self.mock_handler = MagicMock(spec=RequestHandler) + @staticmethod def create_method_uri(): return UUri(authority_name="Neelam", ue_id=4, ue_version_major=1, resource_id=3) @@ -49,133 +51,150 @@ def test_constructor_transport_not_instance(self): self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) async def test_register_request_handler_method_uri_none(self): - server = InMemoryRpcServer(MockUTransport()) - handler = MagicMock(return_value=UPayload.EMPTY) + self.mock_handler.handle_request = MagicMock(return_value=UPayload.EMPTY) + server = InMemoryRpcServer(self.mock_transport) + status = await server.register_request_handler(None, self.mock_handler) + self.assertEqual(status.code, UCode.INVALID_ARGUMENT) - with self.assertRaises(ValueError) as context: - await server.register_request_handler(None, handler) - self.assertEqual(str(context.exception), "Method URI missing") + self.mock_handler.handle_request.assert_not_called() async def test_register_request_handler_handler_none(self): - server = InMemoryRpcServer(MockUTransport()) - with self.assertRaises(ValueError) as context: - await server.register_request_handler(self.create_method_uri(), None) - self.assertEqual(str(context.exception), "Request listener missing") + server = InMemoryRpcServer(self.mock_transport) + status = await server.register_request_handler(self.create_method_uri(), None) + self.assertEqual(status.code, UCode.INVALID_ARGUMENT) async def test_unregister_request_handler_method_uri_none(self): - server = InMemoryRpcServer(MockUTransport()) - handler = MagicMock(return_value=UPayload.EMPTY) - - with self.assertRaises(ValueError) as context: - await server.unregister_request_handler(None, handler) - self.assertEqual(str(context.exception), "Method URI missing") + self.mock_handler.handle_request = MagicMock(return_value=UPayload.EMPTY) + server = InMemoryRpcServer(self.mock_transport) + status = await server.unregister_request_handler(None, self.mock_handler) + self.assertEqual(status.code, UCode.INVALID_ARGUMENT) + self.mock_handler.assert_not_called() async def test_unregister_request_handler_handler_none(self): - server = InMemoryRpcServer(MockUTransport()) - with self.assertRaises(ValueError) as context: - await server.unregister_request_handler(self.create_method_uri(), None) - self.assertEqual(str(context.exception), "Request listener missing") + server = InMemoryRpcServer(self.mock_transport) - async def test_registering_request_listener(self): - handler = MagicMock(return_value=UPayload.EMPTY) - method = self.create_method_uri() - server = InMemoryRpcServer(MockUTransport()) - self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) - self.assertEqual((await server.unregister_request_handler(method, handler)).code, UCode.OK) + status = await server.unregister_request_handler(self.create_method_uri(), None) + self.assertEqual(status.code, UCode.INVALID_ARGUMENT) + + async def test_register_request_handler(self): + self.mock_handler.handle_request = MagicMock(return_value=UPayload.EMPTY) + + # Create instance of InMemoryRpcServer with mocked dependencies + server = InMemoryRpcServer(self.mock_transport) + # Mock the return value of transport.register_listener + self.mock_transport.register_listener = AsyncMock(return_value=UStatus(code=UCode.OK)) + result = await server.register_request_handler(self.create_method_uri(), self.mock_handler) + # Assert the result + self.assertEqual(result.code, UCode.OK) + self.mock_transport.register_listener.assert_called_once() async def test_registering_twice_the_same_request_handler(self): - handler = MagicMock(return_value=UPayload.EMPTY) - server = InMemoryRpcServer(MockUTransport()) - status = await server.register_request_handler(self.create_method_uri(), handler) + self.mock_transport.register_listener = AsyncMock(return_value=UStatus(code=UCode.OK)) + self.mock_handler.handle_request = MagicMock(return_value=UPayload.EMPTY) + + # Create instance of InMemoryRpcServer with mocked dependencies + server = InMemoryRpcServer(self.mock_transport) + status = await server.register_request_handler(self.create_method_uri(), self.mock_handler) self.assertEqual(status.code, UCode.OK) - status = await server.register_request_handler(self.create_method_uri(), handler) + status = await server.register_request_handler(self.create_method_uri(), self.mock_handler) self.assertEqual(status.code, UCode.ALREADY_EXISTS) + self.mock_transport.register_listener.assert_called_once() async def test_unregistering_non_registered_request_handler(self): + self.mock_transport.register_listener = AsyncMock(return_value=UStatus(code=UCode.OK)) handler = MagicMock(side_effect=NotImplementedError("Unimplemented method 'handleRequest'")) - server = InMemoryRpcServer(MockUTransport()) + + # Create instance of InMemoryRpcServer with mocked dependencies + server = InMemoryRpcServer(self.mock_transport) + status = await server.unregister_request_handler(self.create_method_uri(), handler) self.assertEqual(status.code, UCode.NOT_FOUND) async def test_registering_request_listener_with_error_transport(self): - handler = MagicMock(return_value=UPayload.EMPTY) - server = InMemoryRpcServer(ErrorUTransport()) - status = await server.register_request_handler(self.create_method_uri(), handler) + self.mock_transport.register_listener = AsyncMock(return_value=UStatus(code=UCode.FAILED_PRECONDITION)) + self.mock_handler.handle_request = MagicMock(return_value=UPayload.EMPTY) + + # Create instance of InMemoryRpcServer with mocked dependencies + server = InMemoryRpcServer(self.mock_transport) + status = await server.register_request_handler(self.create_method_uri(), self.mock_handler) self.assertEqual(status.code, UCode.FAILED_PRECONDITION) + self.mock_transport.register_listener.assert_called_once() async def test_handle_requests(self): - class CustomTestUTransport(MockUTransport): - async def send(self, message): - serialized_uri = UriSerializer().serialize(message.attributes.sink) - if serialized_uri in self.listeners: - for listener in self.listeners[serialized_uri]: - listener.on_receive(message) - return UStatus(code=UCode.OK) - - transport = CustomTestUTransport() - handler = MagicMock(side_effect=Exception("this should not be called")) - server = InMemoryRpcServer(transport) + listeners: Dict[str, UListener] = {} + + async def custom_register_listener_behavior(source: UUri, listener: UListener, sink: UUri = None) -> UStatus: + topic = UriSerializer().serialize(sink) + + if topic not in listeners: + listeners[topic] = listener + return UStatus(code=UCode.OK) + + self.mock_transport.register_listener = AsyncMock(side_effect=custom_register_listener_behavior) + self.mock_transport.get_source.return_value = UUri(authority_name="Neelam", ue_id=4, ue_version_major=1) + + async def custom_send_behavior(message): + serialized_uri = UriSerializer().serialize(message.attributes.sink) + if serialized_uri in listeners: + listeners[serialized_uri].on_receive(message) + return UStatus(code=UCode.OK) + + self.mock_transport.send = AsyncMock(side_effect=custom_send_behavior) + + mock_handler = MagicMock(spec=RequestHandler) + mock_handler.handle_request = MagicMock(return_value=UPayload.EMPTY) + # Create instance of InMemoryRpcServer with mocked dependencies + server = InMemoryRpcServer(self.mock_transport) method = self.create_method_uri() method2 = copy.deepcopy(method) # Update the resource_id method2.resource_id = 69 - self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) - - request = UMessageBuilder.request(transport.get_source(), method2, 1000).build() + self.assertEqual((await server.register_request_handler(method, mock_handler)).code, UCode.OK) + request = UMessageBuilder.request(self.mock_transport.get_source(), method2, 1000).build() # fake sending a request message that will trigger the handler to be called but since it is # not for the same method as the one registered, it should be ignored and the handler not called - self.assertEqual((await transport.send(request)).code, UCode.OK) + self.assertEqual((await self.mock_transport.send(request)).code, UCode.OK) + self.mock_transport.register_listener.assert_called_once() + self.mock_transport.send.assert_called_once() - async def test_handle_requests_exception(self): - # test transport that will trigger the handleRequest() - class CustomTestUTransport(MockUTransport): - async def send(self, message): - serialized_uri = UriSerializer().serialize(message.attributes.sink) - if serialized_uri in self.listeners: - for listener in self.listeners[serialized_uri]: - listener.on_receive(message) - return UStatus(code=UCode.OK) - - transport = CustomTestUTransport() - - class MyRequestHandler(RequestHandler): - def handle_request(self, message: UMessage) -> UPayload: - raise UStatusError(UStatus(code=UCode.FAILED_PRECONDITION, message="Neelam it failed!")) - - handler = MyRequestHandler() - server = InMemoryRpcServer(transport) - method = self.create_method_uri() + request = UMessageBuilder.request(self.mock_transport.get_source(), method, 1000).build() - self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) + # fake sending a request message that will trigger the handler to be called. + self.assertEqual((await self.mock_transport.send(request)).code, UCode.OK) + mock_handler.handle_request.assert_called_once() - request = UMessageBuilder.request(transport.get_source(), method, 1000).build() - self.assertEqual((await transport.send(request)).code, UCode.OK) + def test_rpcserver_constructor_transport_none(self): + with self.assertRaises(ValueError) as context: + InMemoryRpcServer(None) + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NULL_ERROR) - async def test_handle_requests_unknown_exception(self): - class CustomTestUTransport(MockUTransport): - async def send(self, message): - serialized_uri = UriSerializer().serialize(message.attributes.sink) - if serialized_uri in self.listeners: - for listener in self.listeners[serialized_uri]: - listener.on_receive(message) - return UStatus(code=UCode.OK) + def test_rpcserver_constructor_transport_not_instance(self): + with self.assertRaises(ValueError) as context: + InMemoryRpcServer("InvalidTransport") + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) - transport = CustomTestUTransport() + async def test_register_request_handler_null_parameters(self): + server = InMemoryRpcServer(self.mock_transport) + status = await server.register_request_handler(None, None) + self.assertEqual(status.code, UCode.INVALID_ARGUMENT) - class MyRequestHandler(RequestHandler): - def handle_request(self, message: UMessage) -> UPayload: - raise Exception("Neelam it failed!") + async def test_handle_requests_exception(self): + transport = EchoUTransport() + exception = UStatusError.from_code_message(UCode.FAILED_PRECONDITION, "Not permitted") - handler = MyRequestHandler() + self.mock_handler.handle_request.side_effect = MagicMock(side_effect=exception) + # Create instance of InMemoryRpcServer with mocked dependencies server = InMemoryRpcServer(transport) method = self.create_method_uri() - self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) - + self.assertEqual((await server.register_request_handler(method, self.mock_handler)).code, UCode.OK) request = UMessageBuilder.request(transport.get_source(), method, 1000).build() - self.assertEqual((await transport.send(request)).code, UCode.OK) + + # fake sending a request message that will trigger the handler to be called + status = await transport.send(request) + self.assertEqual(status.code, UCode.OK) if __name__ == '__main__': diff --git a/tests/test_communication/test_inmemorysubscriber.py b/tests/test_communication/test_inmemorysubscriber.py index 6907777..edbe1b4 100644 --- a/tests/test_communication/test_inmemorysubscriber.py +++ b/tests/test_communication/test_inmemorysubscriber.py @@ -12,23 +12,30 @@ SPDX-License-Identifier: Apache-2.0 """ +import asyncio import unittest +from unittest.mock import AsyncMock, MagicMock -from tests.test_communication.mock_utransport import CommStatusTransport, MockUTransport, TimeoutUTransport from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.inmemorysubscriber import InMemorySubscriber +from uprotocol.communication.simplenotifier import SimpleNotifier +from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.communication.upayload import UPayload +from uprotocol.communication.ustatuserror import UStatusError from uprotocol.core.usubscription.v3.usubscription_pb2 import ( SubscriptionResponse, SubscriptionStatus, UnsubscribeResponse, + Update, ) from uprotocol.transport.builder.umessagebuilder import UMessageBuilder from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport from uprotocol.v1.ucode_pb2 import UCode from uprotocol.v1.umessage_pb2 import UMessage from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus class MyListener(UListener): @@ -37,125 +44,359 @@ def on_receive(self, umsg: UMessage) -> None: class TestInMemorySubscriber(unittest.IsolatedAsyncioTestCase): - @classmethod - def setUpClass(cls): - cls.listener = MyListener() + def setUp(self): + self.transport = MagicMock(spec=UTransport) + self.rpc_client = MagicMock(spec=InMemoryRpcClient) + self.notifier = MagicMock(spec=SimpleNotifier) - def create_topic(self): - return UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) + self.topic = UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) + self.source = UUri(authority_name="source_auth", ue_id=4, ue_version_major=1) - async def test_subscribe_happy_path(self): - topic = self.create_topic() - transport = HappySubscribeUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + self.listener = MyListener() - subscription_response = await subscriber.subscribe(topic, self.listener, None) - self.assertFalse(subscription_response is None) + async def test_simple_mock_of_rpc_client_and_notifier(self): + response = SubscriptionResponse( + topic=self.topic, status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBED) + ) - async def test_unsubscribe_happy_path(self): - topic = self.create_topic() - transport = HappyUnSubscribeUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + self.transport.get_source.return_value = self.source + self.transport.register_listener.return_value = UStatus(code=UCode.OK) - response = await subscriber.unsubscribe(topic, self.listener, None) - self.assertEqual(response.message, "") - self.assertEqual(response.code, UCode.OK) + self.rpc_client.invoke_method.return_value = UPayload.pack(response) + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + result = await subscriber.subscribe(self.topic, self.listener) + self.assertEqual(result.status.state, SubscriptionStatus.State.SUBSCRIBED) + + self.rpc_client.invoke_method.assert_called_once() + self.notifier.register_notification_listener.assert_called_once() + self.transport.register_listener.assert_called_once() + self.transport.get_source.assert_called_once() + + async def test_simple_mock_of_rpc_client_and_notifier_returned_subscribe_pending(self): + response = SubscriptionResponse( + topic=self.topic, status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBE_PENDING) + ) + + self.transport.get_source.return_value = self.source + self.transport.register_listener.return_value = UStatus(code=UCode.OK) + + self.rpc_client.invoke_method.return_value = UPayload.pack(response) + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + result = await subscriber.subscribe(self.topic, self.listener) + self.assertEqual(result.status.state, SubscriptionStatus.State.SUBSCRIBE_PENDING) + + self.rpc_client.invoke_method.assert_called_once() + self.notifier.register_notification_listener.assert_called_once() + self.transport.register_listener.assert_called_once() + self.transport.get_source.assert_called_once() + + async def test_simple_mock_of_rpc_client_and_notifier_returned_unsubscribed(self): + response = SubscriptionResponse( + topic=self.topic, status=SubscriptionStatus(state=SubscriptionStatus.State.UNSUBSCRIBED) + ) + + self.transport.get_source.return_value = self.source + self.transport.register_listener.return_value = UStatus(code=UCode.OK) + + self.rpc_client.invoke_method.return_value = UPayload.pack(response) + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + result = await subscriber.subscribe(self.topic, self.listener) + self.assertEqual(result.status.state, SubscriptionStatus.State.UNSUBSCRIBED) + + self.rpc_client.invoke_method.assert_called_once() + self.notifier.register_notification_listener.assert_called_once() + self.transport.register_listener.assert_not_called() + self.transport.get_source.assert_called_once() + + async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokemethod_return_an_exception(self): + self.transport.get_source.return_value = self.source + + exception = Exception("Dummy exception") + self.rpc_client.invoke_method.return_value = exception + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + with self.assertRaises(Exception) as context: + await subscriber.subscribe(self.topic, self.listener) + self.assertEqual("Dummy exception", str(context.exception)) + + self.rpc_client.invoke_method.assert_called_once() + self.notifier.register_notification_listener.assert_called_once() + self.transport.register_listener.assert_not_called() + self.transport.get_source.assert_called_once() + + async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokemethod_return_an_ustatuserror(self): + self.transport.get_source.return_value = self.source + + exception = UStatusError.from_code_message(UCode.FAILED_PRECONDITION, "Not permitted") + self.rpc_client.invoke_method.return_value = exception + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + with self.assertRaises(UStatusError) as context: + await subscriber.subscribe(self.topic, self.listener) + self.assertEqual("Not permitted", context.exception.status.message) + self.assertEqual(UCode.FAILED_PRECONDITION, context.exception.status.code) - async def test_unregister_listener(self): - topic = self.create_topic() + self.rpc_client.invoke_method.assert_called_once() + self.notifier.register_notification_listener.assert_called_once() + self.transport.register_listener.assert_not_called() + self.transport.get_source.assert_called_once() - transport = HappySubscribeUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + async def test_subscribe_when_we_pass_a_subscription_change_notification_handler(self): + self.transport.get_source.return_value = self.source + self.transport.register_listener.return_value = UStatus(code=UCode.OK) - subscription_response = await subscriber.subscribe(topic, self.listener, CallOptions()) - self.assertFalse(subscription_response is None) + self.rpc_client.invoke_method.return_value = UPayload.pack( + SubscriptionResponse(topic=self.topic, status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBED)) + ) + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + + result = await subscriber.subscribe(self.topic, self.listener, CallOptions.DEFAULT, handler) + + self.assertEqual(result.status.state, SubscriptionStatus.State.SUBSCRIBED) + + self.rpc_client.invoke_method.assert_called_once() + self.notifier.register_notification_listener.assert_called_once() + self.transport.register_listener.assert_called_once() + self.transport.get_source.assert_called_once() + + async def test_subscribe_when_we_try_to_subscribe_to_the_same_topic_twice_with_same_notification_handler(self): + self.transport.get_source.return_value = self.source + + self.rpc_client.invoke_method.return_value = UPayload.pack( + SubscriptionResponse(topic=self.topic, status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBED)) + ) + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + # First subscription attempt + result = await subscriber.subscribe(self.topic, self.listener, CallOptions.DEFAULT, handler) + self.assertEqual(result.status.state, SubscriptionStatus.State.SUBSCRIBED) + + # Second subscription attempt + result = await subscriber.subscribe(self.topic, self.listener, CallOptions.DEFAULT, handler) + self.assertEqual(result.status.state, SubscriptionStatus.State.SUBSCRIBED) + + self.assertEqual(self.rpc_client.invoke_method.call_count, 2) + self.notifier.register_notification_listener.assert_called_once() + self.assertEqual(self.transport.get_source.call_count, 2) + + async def test_subscribe_when_we_try_to_subscribe_to_the_same_topic_twice_with_different_notification_handler(self): + self.transport.get_source.return_value = self.source - status = await subscriber.unregister_listener(topic, self.listener) - self.assertEqual(status.code, UCode.OK) + self.rpc_client.invoke_method.return_value = UPayload.pack( + SubscriptionResponse(topic=self.topic, status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBED)) + ) + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + handler = MagicMock(spec=SubscriptionChangeHandler) + + handler1 = MagicMock(spec=SubscriptionChangeHandler) - async def test_unsubscribe_with_commstatus_error(self): - topic = UUri(authority_name="neelam", ue_id=4, ue_version_major=1, resource_id=0x8000) - transport = CommStatusTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + # First subscription attempt + result = await subscriber.subscribe(self.topic, self.listener, CallOptions.DEFAULT, handler) + self.assertEqual(result.status.state, SubscriptionStatus.State.SUBSCRIBED) + # Second subscription attempt should raise an exception + with self.assertRaises(UStatusError) as context: + await subscriber.subscribe(self.topic, self.listener, CallOptions.DEFAULT, handler1) + self.assertEqual("Handler already registered", context.exception.status.message) + self.assertEqual(UCode.ALREADY_EXISTS, context.exception.status.code) - response = await subscriber.unsubscribe(topic, self.listener, None) - self.assertEqual(response.message, "Communication error [FAILED_PRECONDITION]") - self.assertEqual(response.code, UCode.FAILED_PRECONDITION) + self.assertEqual(2, self.rpc_client.invoke_method.call_count) + self.notifier.register_notification_listener.assert_called_once() + self.assertEqual(self.transport.get_source.call_count, 2) - async def test_unsubscribe_with_exception(self): - topic = self.create_topic() - transport = TimeoutUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + async def test_unsubscribe_using_mock_rpcclient_and_simplernotifier(self): + self.transport.register_listener.return_value = UStatus(code=UCode.OK) + self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) + self.rpc_client.invoke_method.return_value = UPayload.pack(UnsubscribeResponse()) + + self.notifier.unregister_notification_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + response = await subscriber.unsubscribe(self.topic, self.listener) + self.assertEqual(response.message, "") + self.assertEqual(response.code, UCode.OK) + subscriber.close() + self.rpc_client.invoke_method.assert_called_once() + self.notifier.unregister_notification_listener.assert_called_once() + self.transport.unregister_listener.assert_called_once() + + async def test_unsubscribe_when_invokemethod_return_an_exception(self): + self.transport.register_listener.return_value = UStatus(code=UCode.OK) + self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) + self.rpc_client.invoke_method.return_value = UStatusError.from_code_message( + UCode.CANCELLED, "Operation cancelled" + ) + self.notifier.unregister_notification_listener.return_value = UStatus(code=UCode.OK) + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + response = await subscriber.unsubscribe(self.topic, self.listener) + self.assertEqual(response.message, "Operation cancelled") + self.assertEqual(response.code, UCode.CANCELLED) + subscriber.close() + self.rpc_client.invoke_method.assert_called_once() + self.notifier.unregister_notification_listener.assert_called_once() + self.transport.unregister_listener.assert_not_called() + + async def test_unsubscribe_when_invokemethod_returned_ok_but_we_failed_to_unregister_the_listener(self): + self.transport.register_listener.return_value = UStatus(code=UCode.OK) + self.transport.unregister_listener.return_value = UStatusError.from_code_message(UCode.ABORTED, "aborted") + self.rpc_client.invoke_method.return_value = UPayload.pack( + SubscriptionResponse(status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBE_PENDING)) + ) + self.notifier.unregister_notification_listener.return_value = UStatus(code=UCode.OK) + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + response = await subscriber.unsubscribe(self.topic, self.listener) + self.assertEqual(response.status.code, UCode.ABORTED) + self.assertEqual(response.status.message, "aborted") + subscriber.close() + self.rpc_client.invoke_method.assert_called_once() + self.notifier.unregister_notification_listener.assert_called_once() + self.transport.unregister_listener.assert_called_once() + + async def test_handling_going_from_subscribe_pending_to_subscribed_state(self): + barrier = asyncio.Event() + self.transport.get_source.return_value = self.source + self.transport.register_listener.return_value = UStatus(code=UCode.OK) + self.rpc_client.invoke_method.return_value = UPayload.pack( + SubscriptionResponse(status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBE_PENDING)) + ) - response = await subscriber.unsubscribe(topic, self.listener, CallOptions(1)) - self.assertEqual(response.message, "Request timed out") - self.assertEqual(response.code, UCode.DEADLINE_EXCEEDED) + async def register_notification_listener(uri, listener): + barrier.set() # Release the barrier + await barrier.wait() # Wait for the barrier again + update = Update(topic=self.topic, status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBED)) + message = UMessageBuilder.notification(self.topic, self.source).build_from_upayload(UPayload.pack(update)) + listener.on_receive(message) + return UStatus(code=UCode.OK) + + self.notifier.register_notification_listener = AsyncMock(side_effect=register_notification_listener) + subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + result = await subscriber.subscribe(self.topic, self.listener, CallOptions.DEFAULT) + self.assertEqual(result.status.state, SubscriptionStatus.State.SUBSCRIBE_PENDING) + # Release the barrier + barrier.set() + # Wait for the barrier again to ensure notification handling + await barrier.wait() + + self.rpc_client.invoke_method.assert_called_once() + self.notifier.register_notification_listener.assert_called_once() + self.transport.register_listener.assert_called_once() + self.transport.get_source.assert_called_once() async def test_unregister_listener_missing_topic(self): - transport = TimeoutUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unregister_listener(None, self.listener) self.assertEqual(str(context.exception), "Unsubscribe topic missing") async def test_unregister_listener_missing_listener(self): - topic = self.create_topic() - transport = TimeoutUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: - await subscriber.unregister_listener(topic, None) + await subscriber.unregister_listener(self.topic, None) self.assertEqual(str(context.exception), "Request listener missing") async def test_unsubscribe_missing_topic(self): - transport = TimeoutUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unsubscribe(None, self.listener, CallOptions()) self.assertEqual(str(context.exception), "Unsubscribe topic missing") async def test_unsubscribe_missing_listener(self): - topic = self.create_topic() - transport = TimeoutUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: - await subscriber.unsubscribe(topic, None, CallOptions()) + await subscriber.unsubscribe(self.topic, None, CallOptions()) self.assertEqual(str(context.exception), "Listener missing") async def test_subscribe_missing_topic(self): - transport = TimeoutUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.subscribe(None, self.listener, CallOptions()) self.assertEqual(str(context.exception), "Subscribe topic missing") async def test_subscribe_missing_listener(self): - topic = self.create_topic() - transport = TimeoutUTransport() - subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: - await subscriber.subscribe(topic, None, CallOptions()) + await subscriber.subscribe(self.topic, None, CallOptions()) self.assertEqual(str(context.exception), "Request listener missing") + def test_subscriber_constructor_transport_none(self): + with self.assertRaises(ValueError) as context: + InMemorySubscriber(None, None, None) + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NULL_ERROR) -class HappySubscribeUTransport(MockUTransport): - def build_response(self, request): - return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( - UPayload.pack( - SubscriptionResponse( - status=SubscriptionStatus( - state=SubscriptionStatus.State.SUBSCRIBED, message="Successfully Subscribed" - ), - topic=TestInMemorySubscriber().create_topic(), - ) - ) - ) + def test_subscriber_constructor_transport_not_instance(self): + with self.assertRaises(ValueError) as context: + InMemorySubscriber("InvalidTransport", None, None) + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + def test_subscriber_constructor_rpcclient_none(self): + with self.assertRaises(ValueError) as context: + InMemorySubscriber(self.transport, None, None) + self.assertEqual(str(context.exception), "RpcClient missing") -class HappyUnSubscribeUTransport(MockUTransport): - def build_response(self, request): - return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( - UPayload.pack(UnsubscribeResponse()) - ) + def test_subscriber_constructor_notiifier_none(self): + with self.assertRaises(ValueError) as context: + InMemorySubscriber(self.transport, self.rpc_client, None) + self.assertEqual(str(context.exception), "Notifier missing") if __name__ == '__main__': diff --git a/tests/test_communication/test_rpcmapper.py b/tests/test_communication/test_rpcmapper.py index 12efddd..1202229 100644 --- a/tests/test_communication/test_rpcmapper.py +++ b/tests/test_communication/test_rpcmapper.py @@ -12,11 +12,8 @@ SPDX-License-Identifier: Apache-2.0 """ -import asyncio import unittest -import pytest - from tests.test_communication.mock_utransport import MockUTransport from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.rpcmapper import RpcMapper @@ -33,13 +30,13 @@ async def test_map_response(self): payload = UPayload.pack(uri) rpc_client = InMemoryRpcClient(MockUTransport()) - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), payload, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), payload) result = await RpcMapper.map_response(future_result, UUri) assert result == uri async def test_map_response_to_result_with_empty_request(self): rpc_client = InMemoryRpcClient(MockUTransport()) - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), None, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), None, None) result = await RpcMapper.map_response_to_result(future_result, UUri) assert result.is_success() assert result.success_value() == UUri() @@ -50,9 +47,8 @@ async def invoke_method(self, uri, payload, options): raise RuntimeError("Error") rpc_client = RpcClientWithException() - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), None, None)) - - with pytest.raises(RuntimeError): + future_result = rpc_client.invoke_method(self.create_method_uri(), None, None) + with self.assertRaises(RuntimeError): await RpcMapper.map_response(future_result, UUri) async def test_map_response_with_empty_payload(self): @@ -61,7 +57,7 @@ async def invoke_method(self, uri, payload, options): return UPayload.EMPTY rpc_client = RpcClientWithEmptyPayload() - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None) result = await RpcMapper.map_response(future_result, UUri) assert result == UUri() @@ -71,11 +67,11 @@ async def invoke_method(self, uri, payload, options): return None rpc_client = RpcClientWithNullPayload() - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None) - with pytest.raises(Exception) as exc_info: + with self.assertRaises(Exception) as exc_info: await RpcMapper.map_response(future_result, UUri) - assert str(exc_info.value) == f"Unknown payload. Expected [{UUri.__name__}]" + assert str(exc_info.exception) == f"Unknown payload. Expected [{UUri.__name__}]" async def test_map_response_to_result_with_non_empty_payload(self): uri = UUri(authority_name="Neelam") @@ -86,7 +82,7 @@ async def invoke_method(self, uri, payload, options): return payload rpc_client = RpcClientWithNonEmptyPayload() - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), payload, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), payload, None) result = await RpcMapper.map_response_to_result(future_result, UUri) assert result.is_success() assert result.success_value() == uri @@ -97,7 +93,7 @@ async def invoke_method(self, uri, payload, options): return None rpc_client = RpcClientWithNullPayload() - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None) result = await RpcMapper.map_response_to_result(future_result, UUri) assert result.is_failure() @@ -107,7 +103,7 @@ async def invoke_method(self, uri, payload, options): return UPayload.EMPTY rpc_client = RpcClientWithEmptyPayload() - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None) result = await RpcMapper.map_response_to_result(future_result, UUri) assert result.is_success() assert result.success_value() == UUri() @@ -119,7 +115,7 @@ async def invoke_method(self, uri, payload, options): raise UStatusError(status) rpc_client = RpcClientWithException() - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None) result = await RpcMapper.map_response_to_result(future_result, UUri) assert result.is_failure() assert result.failure_value().code == UCode.FAILED_PRECONDITION @@ -128,10 +124,10 @@ async def invoke_method(self, uri, payload, options): async def test_map_response_to_result_with_timeout_exception(self): class RpcClientWithTimeoutException: async def invoke_method(self, uri, payload, options): - raise asyncio.TimeoutError() + raise UStatusError.from_code_message(code=UCode.DEADLINE_EXCEEDED, message="Request timed out") rpc_client = RpcClientWithTimeoutException() - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None) result = await RpcMapper.map_response_to_result(future_result, UUri) assert result.is_failure() assert result.failure_value().code == UCode.DEADLINE_EXCEEDED @@ -143,7 +139,7 @@ async def invoke_method(self, uri, payload, options): raise ValueError() rpc_client = RpcClientWithInvalidArgumentsException() - future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + future_result = rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None) result = await RpcMapper.map_response_to_result(future_result, UUri) assert result.is_failure() assert result.failure_value().code == UCode.INVALID_ARGUMENT diff --git a/tests/test_communication/test_rpcresult.py b/tests/test_communication/test_rpcresult.py index d182faf..9030aae 100644 --- a/tests/test_communication/test_rpcresult.py +++ b/tests/test_communication/test_rpcresult.py @@ -16,6 +16,7 @@ from uprotocol.communication.rpcresult import RpcResult from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.ustatus_pb2 import UStatus class TestRpcResult(unittest.TestCase): @@ -34,6 +35,7 @@ def test_is_failure_on_success(self): def test_is_failure_on_failure(self): result = RpcResult.failure(code=UCode.INVALID_ARGUMENT, message="boom") self.assertTrue(result.is_failure()) + self.assertEqual(str(result), 'Failure(code: INVALID_ARGUMENT\nmessage: "boom"\n)') def test_to_string_success(self): result = RpcResult.success(2) @@ -45,6 +47,24 @@ def test_to_string_failure(self): self.assertEqual(result.failure_value().code, UCode.INVALID_ARGUMENT) self.assertEqual(result.failure_value().message, "boom") + def test_success_value_onsuccess(self): + result = RpcResult.success(2) + assert result.success_value() == 2 + + def test_success_value_onfailure(self): + result = RpcResult.failure(code=UCode.INVALID_ARGUMENT, message="boom") + with self.assertRaises(ValueError): + result.success_value() + + def test_failure_value_onsuccess(self): + result = RpcResult.success(2) + with self.assertRaises(ValueError): + result.failure_value() + + def test_failure_value_onfailure(self): + result = RpcResult.failure(code=UCode.INVALID_ARGUMENT, message="boom") + assert result.failure_value() == UStatus(code=UCode.INVALID_ARGUMENT, message="boom") + if __name__ == '__main__': unittest.main() diff --git a/tests/test_communication/test_simplenotifier.py b/tests/test_communication/test_simplenotifier.py index d9043d9..12483cd 100644 --- a/tests/test_communication/test_simplenotifier.py +++ b/tests/test_communication/test_simplenotifier.py @@ -13,66 +13,117 @@ """ import unittest +from unittest.mock import MagicMock -from tests.test_communication.mock_utransport import MockUTransport +from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.simplenotifier import SimpleNotifier from uprotocol.communication.upayload import UPayload from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport from uprotocol.v1.ucode_pb2 import UCode from uprotocol.v1.umessage_pb2 import UMessage from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus class TestSimpleNotifier(unittest.IsolatedAsyncioTestCase): - def create_topic(self): - return UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) - - def create_destination_uri(self): - return UUri(ue_id=4, ue_version_major=1) + def setUp(self): + self.transport = MagicMock(spec=UTransport) + self.source = UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) + self.sink = UUri(ue_id=4, ue_version_major=1) async def test_send_notification(self): - notifier = SimpleNotifier(MockUTransport()) - status = await notifier.notify(self.create_topic(), self.create_destination_uri()) + self.transport.send.return_value = UStatus(code=UCode.OK) + notifier = SimpleNotifier(self.transport) + status = await notifier.notify(self.source, self.sink) self.assertEqual(status.code, UCode.OK) + self.transport.send.assert_called_once() async def test_send_notification_with_payload(self): + self.transport.send.return_value = UStatus(code=UCode.OK) + uri = UUri(authority_name="Neelam") - notifier = SimpleNotifier(MockUTransport()) - status = await notifier.notify(self.create_topic(), self.create_destination_uri(), payload=UPayload.pack(uri)) + notifier = SimpleNotifier(self.transport) + status = await notifier.notify(self.source, self.sink, payload=UPayload.pack(uri)) self.assertEqual(status.code, UCode.OK) + self.transport.send.assert_called_once() + + async def test_send_notification_with_payload_and_calloptions(self): + self.transport.send.return_value = UStatus(code=UCode.OK) + + uri = UUri(authority_name="Neelam") + notifier = SimpleNotifier(self.transport) + status = await notifier.notify(self.source, self.sink, CallOptions.DEFAULT, UPayload.pack(uri)) + self.assertEqual(status.code, UCode.OK) + self.transport.send.assert_called_once() async def test_register_listener(self): + self.transport.register_listener.return_value = UStatus(code=UCode.OK) + self.transport.get_source.return_value = self.source + class TestListener(UListener): def on_receive(self, message: UMessage): pass listener = TestListener() - notifier = SimpleNotifier(MockUTransport()) - status = await notifier.register_notification_listener(self.create_topic(), listener) + notifier = SimpleNotifier(self.transport) + status = await notifier.register_notification_listener(self.source, listener) self.assertEqual(status.code, UCode.OK) + self.transport.register_listener.assert_called_once() + self.transport.get_source.assert_called_once() async def test_unregister_notification_listener(self): + self.transport.register_listener.return_value = UStatus(code=UCode.OK) + self.transport.get_source.return_value = self.source + self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) + class TestListener(UListener): def on_receive(self, message: UMessage): pass listener = TestListener() - notifier = SimpleNotifier(MockUTransport()) - status = await notifier.register_notification_listener(self.create_topic(), listener) + notifier = SimpleNotifier(self.transport) + status = await notifier.register_notification_listener(self.source, listener) self.assertEqual(status.code, UCode.OK) - status = await notifier.unregister_notification_listener(self.create_topic(), listener) + status = await notifier.unregister_notification_listener(self.source, listener) self.assertEqual(status.code, UCode.OK) + self.transport.register_listener.assert_called_once() + self.assertEqual(self.transport.get_source.call_count, 2) + self.transport.unregister_listener.assert_called_once() async def test_unregister_listener_not_registered(self): + self.transport.get_source.return_value = self.source + self.transport.unregister_listener.return_value = UStatus(code=UCode.NOT_FOUND) + class TestListener(UListener): def on_receive(self, message: UMessage): pass listener = TestListener() - notifier = SimpleNotifier(MockUTransport()) - status = await notifier.unregister_notification_listener(self.create_topic(), listener) - self.assertEqual(status.code, UCode.INVALID_ARGUMENT) + notifier = SimpleNotifier(self.transport) + status = await notifier.unregister_notification_listener(self.source, listener) + self.assertEqual(status.code, UCode.NOT_FOUND) + self.transport.register_listener.assert_not_called() + self.transport.get_source.assert_called_once() + self.transport.unregister_listener.assert_called_once() + + def test_simplenotifier_constructor_transport_none(self): + with self.assertRaises(ValueError) as context: + SimpleNotifier(None) + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NULL_ERROR) + + def test_simplenotifier_constructor_transport_not_instance(self): + with self.assertRaises(ValueError) as context: + SimpleNotifier("InvalidTransport") + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + + async def test_send_notification_with_options(self): + self.transport.send.return_value = UStatus(code=UCode.OK) + self.notifier = SimpleNotifier(self.transport) + result = await self.notifier.notify(self.source, self.sink, CallOptions.DEFAULT) + + self.assertEqual(result.code, UCode.OK) if __name__ == '__main__': diff --git a/tests/test_communication/test_simplepublisher.py b/tests/test_communication/test_simplepublisher.py index 863e194..f1039b8 100644 --- a/tests/test_communication/test_simplepublisher.py +++ b/tests/test_communication/test_simplepublisher.py @@ -13,29 +13,35 @@ """ import unittest +from unittest.mock import MagicMock -from tests.test_communication.mock_utransport import MockUTransport from uprotocol.communication.simplepublisher import SimplePublisher from uprotocol.communication.upayload import UPayload from uprotocol.transport.utransport import UTransport from uprotocol.v1.ucode_pb2 import UCode from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus class TestSimplePublisher(unittest.IsolatedAsyncioTestCase): - def create_topic(self): - return UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) + def setUp(self): + self.transport = MagicMock(spec=UTransport) + self.topic = UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=2) async def test_send_publish(self): - publisher = SimplePublisher(MockUTransport()) - status = await publisher.publish(self.create_topic()) + self.transport.send.return_value = UStatus(code=UCode.OK) + publisher = SimplePublisher(self.transport) + status = await publisher.publish(self.topic) self.assertEqual(status.code, UCode.OK) + self.transport.send.assert_called_once() async def test_send_publish_with_stuffed_payload(self): + self.transport.send.return_value = UStatus(code=UCode.OK) uri = UUri(authority_name="Neelam") - publisher = SimplePublisher(MockUTransport()) - status = await publisher.publish(self.create_topic(), payload=UPayload.pack_to_any(uri)) + publisher = SimplePublisher(self.transport) + status = await publisher.publish(self.topic, payload=UPayload.pack_to_any(uri)) self.assertEqual(status.code, UCode.OK) + self.transport.send.assert_called_once() def test_constructor_transport_none(self): with self.assertRaises(ValueError) as context: @@ -48,9 +54,8 @@ def test_constructor_transport_not_instance(self): self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) async def test_publish_topic_none(self): - publisher = SimplePublisher(MockUTransport()) + publisher = SimplePublisher(self.transport) uri = UUri(authority_name="Neelam") - with self.assertRaises(ValueError) as context: await publisher.publish(None, payload=UPayload.pack_to_any(uri)) self.assertEqual(str(context.exception), "Publish topic missing") diff --git a/tests/test_communication/test_subscriber.py b/tests/test_communication/test_subscriber.py deleted file mode 100644 index 4d92c59..0000000 --- a/tests/test_communication/test_subscriber.py +++ /dev/null @@ -1,111 +0,0 @@ -""" -SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -This program and the accompanying materials are made available under the -terms of the Apache License Version 2.0 which is available at - - http://www.apache.org/licenses/LICENSE-2.0 - -SPDX-License-Identifier: Apache-2.0 -""" - -import asyncio -import unittest -from unittest.mock import MagicMock - -from tests.test_communication.mock_utransport import MockUTransport -from uprotocol.communication.calloptions import CallOptions -from uprotocol.communication.uclient import UClient -from uprotocol.communication.upayload import UPayload -from uprotocol.core.usubscription.v3.usubscription_pb2 import ( - SubscriptionResponse, - SubscriptionStatus, - UnsubscribeResponse, -) -from uprotocol.transport.builder.umessagebuilder import UMessageBuilder -from uprotocol.transport.ulistener import UListener -from uprotocol.v1.ucode_pb2 import UCode -from uprotocol.v1.umessage_pb2 import UMessage -from uprotocol.v1.uri_pb2 import UUri - - -class MyListener(UListener): - def on_receive(self, umsg: UMessage) -> None: - # Handle receiving subscriptions here - assert umsg is not None - - -class TestSubscriber(unittest.IsolatedAsyncioTestCase): - @classmethod - def setUpClass(cls): - cls.listener = MyListener() - - async def test_subscribe(self): - topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) - transport = HappySubscribeUTransport() - upclient = UClient(transport) - subscription_response = await upclient.subscribe(topic, self.listener, CallOptions(timeout=5000)) - # check for successfully subscribed - self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) - - async def test_publish_notify_subscribe_listener(self): - topic = UUri(ue_id=5, ue_version_major=1, resource_id=0x8000) - transport = HappySubscribeUTransport() - upclient = UClient(transport) - subscription_response = await upclient.subscribe(topic, self.listener, CallOptions(timeout=5000)) - self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) - - # Create a mock for MyListener's on_receive method - self.listener.on_receive = MagicMock(side_effect=self.listener.on_receive) - status = await upclient.publish(topic, None) - self.assertEqual(status.code, UCode.OK) - # Wait for a short time to ensure on_receive can be called - await asyncio.sleep(1) - # Verify that on_receive was called - self.listener.on_receive.assert_called_once() - - async def test_unsubscribe(self): - topic = UUri(ue_id=6, ue_version_major=1, resource_id=0x8000) - transport = HappyUnSubscribeUTransport() - upclient = UClient(transport) - status = await upclient.unsubscribe(topic, self.listener, None) - # check for successfully unsubscribed - self.assertEqual(status.code, UCode.OK) - - async def test_subscribe_unsubscribe(self): - transport = HappySubscribeUTransport() - upclient = UClient(transport) - topic = UUri(ue_id=7, ue_version_major=1, resource_id=0x8000) - subscription_response = await upclient.subscribe(topic, self.listener, None) - self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) - - status2 = await upclient.unsubscribe(topic, self.listener, None) - # check for successfully unsubscribed - self.assertEqual(status2.code, UCode.OK) - - -class HappySubscribeUTransport(MockUTransport): - def build_response(self, request): - return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( - UPayload.pack( - SubscriptionResponse( - status=SubscriptionStatus( - state=SubscriptionStatus.State.SUBSCRIBED, message="Successfully Subscribed" - ) - ) - ) - ) - - -class HappyUnSubscribeUTransport(MockUTransport): - def build_response(self, request): - return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( - UPayload.pack(UnsubscribeResponse()) - ) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_communication/test_uclient.py b/tests/test_communication/test_uclient.py index cce62aa..d179e4f 100644 --- a/tests/test_communication/test_uclient.py +++ b/tests/test_communication/test_uclient.py @@ -41,10 +41,19 @@ def on_receive(self, umsg: UMessage) -> None: assert umsg is not None -class UPClientTest(unittest.IsolatedAsyncioTestCase): - @classmethod - def setUpClass(cls): - cls.listener = MyListener() +async def register_and_unregister_request_handler(client, handler): + await client.register_request_handler(create_method_uri(), handler) + await client.unregister_request_handler(create_method_uri(), handler) + + +async def unregister_listener_not_registered(client, listener): + result = await client.unregister_listener(create_topic(), listener) + assert result.code == UCode.NOT_FOUND + + +class UClientTest(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.listener = MyListener() def test_create_upclient_with_null_transport(self): with self.assertRaises(ValueError): @@ -84,7 +93,7 @@ async def test_unregister_listener_not_registered(self): listener.on_receive = MagicMock() status = await UClient(MockUTransport()).unregister_notification_listener(create_topic(), listener) - self.assertEqual(status.code, UCode.INVALID_ARGUMENT) + self.assertEqual(status.code, UCode.NOT_FOUND) async def test_send_publish(self): status = await UClient(MockUTransport()).publish(create_topic()) @@ -158,14 +167,6 @@ async def test_subscribe_happy_path(self): # check for successfully subscribed self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) - async def test_unsubscribe(self): - topic = UUri(ue_id=6, ue_version_major=1, resource_id=0x8000) - transport = HappyUnSubscribeUTransport() - upclient = UClient(transport) - status = await upclient.unsubscribe(topic, self.listener, None) - # check for successfully unsubscribed - self.assertEqual(status.code, UCode.OK) - async def test_unregister_listener(self): topic = create_topic() my_listener = create_autospec(UListener, instance=True) @@ -204,6 +205,39 @@ async def test_request_handler_for_notification(self): await client.register_request_handler(create_method_uri(), handler) self.assertEqual(await client.notify(create_topic(), transport.get_source()), UStatus(code=UCode.OK)) + async def test_happy_path_for_all_apis_async(self): + client = UClient(MockUTransport()) + + class MyUListener(UListener): + def on_receive(self, umsg: UMessage) -> None: + pass + + class MyRequestHandler(RequestHandler): + def handle_request(self, message: UMessage) -> UPayload: + pass + + async def run_tests(): + listener = MyUListener() + handler = MyRequestHandler() + listener.on_receive = MagicMock() + + tasks = [ + client.notify(create_topic(), create_destination_uri()), + client.publish(create_topic()), + client.invoke_method(create_method_uri(), UPayload.pack(None), CallOptions.DEFAULT), + client.subscribe(create_topic(), listener), + client.unsubscribe(create_topic(), listener), + unregister_listener_not_registered(client, listener), + client.register_notification_listener(create_topic(), listener), + client.unregister_notification_listener(create_topic(), listener), + register_and_unregister_request_handler(client, handler), + ] + + await asyncio.gather(*tasks) + client.close() + + await run_tests() + def create_topic(): return UUri(authority_name="neelam", ue_id=4, ue_version_major=1, resource_id=0x8000) diff --git a/tests/test_uri/test_validator/test_urivalidator.py b/tests/test_uri/test_validator/test_urivalidator.py index 7e96294..1ee5624 100644 --- a/tests/test_uri/test_validator/test_urivalidator.py +++ b/tests/test_uri/test_validator/test_urivalidator.py @@ -127,6 +127,36 @@ def test_matches_fail_for_different_resource(self): candidate = UriSerializer.deserialize("//authority/A410/3/1003") self.assertFalse(UriValidator.matches(pattern, candidate)) + def test_has_wildcard_for_null_uuri(self): + self.assertFalse(UriValidator.has_wildcard(None)) + + def test_has_wildcard_for_empty_uuri(self): + self.assertFalse(UriValidator.has_wildcard(UUri())) + + def test_has_wildcard_for_uuri_with_wildcard_authority(self): + uri = UriSerializer.deserialize("//*/A410/3/1003") + self.assertTrue(UriValidator.has_wildcard(uri)) + + def test_has_wildcard_for_uuri_with_wildcard_entity_id(self): + uri = UriSerializer.deserialize("//authority/FFFF/3/1003") + self.assertTrue(UriValidator.has_wildcard(uri)) + + def test_has_wildcard_for_uuri_with_wildcard_entity_instance(self): + uri = UriSerializer.deserialize("//authority/1FFFF/3/1003") + self.assertTrue(UriValidator.has_wildcard(uri)) + + def test_has_wildcard_for_uuri_with_wildcard_version(self): + uri = UriSerializer.deserialize("//authority/A410/FF/1003") + self.assertTrue(UriValidator.has_wildcard(uri)) + + def test_has_wildcard_for_uuri_with_wildcard_resource(self): + uri = UriSerializer.deserialize("//authority/A410/3/FFFF") + self.assertTrue(UriValidator.has_wildcard(uri)) + + def test_has_wildcard_for_uuri_with_no_wildcards(self): + uri = UriSerializer.deserialize("//authority/A410/3/1003") + self.assertFalse(UriValidator.has_wildcard(uri)) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_uuid/test_factory/test_uuidfactory.py b/tests/test_uuid/test_factory/test_uuidfactory.py index 9dd5cfa..94a255e 100644 --- a/tests/test_uuid/test_factory/test_uuidfactory.py +++ b/tests/test_uuid/test_factory/test_uuidfactory.py @@ -163,6 +163,8 @@ def test_uuidutils_fromstring_with_invalid_string(self): self.assertEqual(uuid, UUID()) uuid1 = UuidSerializer.deserialize("") self.assertEqual(uuid1, UUID()) + uuid2 = UuidSerializer.deserialize("jshkh") + self.assertEqual(uuid2, UUID()) def test_create_uprotocol_uuid_in_the_past(self): now = datetime.now() diff --git a/tests/test_validation/test_validationresult.py b/tests/test_validation/test_validationresult.py new file mode 100644 index 0000000..d867541 --- /dev/null +++ b/tests/test_validation/test_validationresult.py @@ -0,0 +1,54 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.ustatus_pb2 import UStatus +from uprotocol.validation.validationresult import ValidationResult + + +class TestValidationResultTest(unittest.TestCase): + def test_success_validation_result_to_string(self): + success = ValidationResult.success() + self.assertEqual("ValidationResult.Success()", str(success)) + + def test_failure_validation_result_to_string(self): + failure = ValidationResult.failure("boom") + self.assertEqual("ValidationResult.Failure(message='boom')", str(failure)) + + def test_success_validation_result_is_success(self): + success = ValidationResult.success() + self.assertTrue(success.is_success()) + + def test_failure_validation_result_is_success(self): + failure = ValidationResult.failure("boom") + self.assertFalse(failure.is_success()) + + def test_success_validation_result_get_message(self): + success = ValidationResult.success() + self.assertTrue(success.get_message() == '') + + def test_failure_validation_result_get_message(self): + failure = ValidationResult.failure("boom") + self.assertEqual("boom", failure.get_message()) + + def test_success_validation_result_to_status(self): + success = ValidationResult.success() + self.assertEqual(ValidationResult.STATUS_SUCCESS, success.to_status()) + + def test_failure_validation_result_to_status(self): + failure = ValidationResult.failure("boom") + status = UStatus(code=UCode.INVALID_ARGUMENT, message="boom") + self.assertEqual(status, failure.to_status()) diff --git a/uprotocol/communication/README.adoc b/uprotocol/communication/README.adoc new file mode 100644 index 0000000..a5c9e93 --- /dev/null +++ b/uprotocol/communication/README.adoc @@ -0,0 +1,157 @@ += uProtocol Communication Layer +:toc: +:sectnums: + + +== Overview + +The following folder contains implementations of the L2 Communication Layer APIs, as per the https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l2/api.adoc[uProtocol L2 API Specifications]. + + + +## uP-L2 Interfaces + +.Interfaces (uP-L2 Interface) +[cols="1,1,3",options="header"] +|=== +| Interface | Implementation(s) | Description + +| xref:publisher.py[*Publisher*] | xref:simplepublisher.py[SimplePublisher] | Producers API to send publish or notification messages +| xref:subscriber.py[*Subscriber*] | xref:inmemorysubscriber.py[InMemorySubscriber] | Consumers API to subscribe or unsubscribe to topics +| xref:rpcclient.py[*RpcClient*] | xref:inmemoryrpcclient.py[InMemoryRpcClient] | Client interface to invoke a method +| xref:rpcserver.py[*RpcServer*] | xref:inmemoryrpcserver.py[InMemoryRpcServer]| Server interface to register a listener for incoming RPC requests and automatically send a response +| xref:notifier.py[*Notifier*] | xref:simplenotifier.py[SimpleNotifier] | Notification communication pattern APIs to notify and register a listener to receive the notifications +| All the above | xref:uclient.py[UClient] | Single class that Implements all the interfaces above using the various implementations also from above +|=== + + +== Examples +The uP-L2 interfaces are designed to be used by uEs (applications and services) that rely on the communication patterns to talk with other uEs. The interfaces only require an implementation of uTransport passed by reference to the various APIs to perform the various communication patterns. + +NOTE: Examples below will be using the `UClient` implementation. + + +=== Publish a Message +[,python] +---- + +transport = # your UTransport instance +#topic to publish +topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) +publisher : Publisher = UClient(transport) +#send the publish message +publisher.publish(topic) +---- + +=== Subscribe and Unsubscribe +[,python] +---- +transport = # your UTransport instance + +#subscription topic +topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) + +#Listener to process incoming events on the topic +class MySubscriptionListener(UListener): + def on_receive(self, umsg: UMessage) -> None: + # Handle receiving published message + pass +listener = MySubscriptionListener() +# Optional handler that is called whenever the SubscriptionState changes for the subscriber +class MySubscriptionChangeHandler(SubscriptionChangeHandler) + def handle_subscription_change(self, topic: UUri, status: SubscriptionStatus) -> None: + # Handle subscription status changes if you're interested like when + # the subscription state changes from SUBSCRIBE_PENDING to SUBSCRIBED + pass + +subscriber : Subscriber = UClient(transport) +subscriber.subscribe(topic, listener, CallOptions.DEFAULT, MySubscriptionChangeHandler()) + +#UnSubscribe from the topic +subscriber.unsubscribe(topic, listener) +---- +=== Invoke a method using RPCClient +[,python] +---- +transport = # your UTransport instance + +#URI of the method to be invoked +method_uri= UUri(ue_id=10, ue_version_major=1, resource_id=3) +payload = UPayload.pack_to_any(UUri()) +options = CallOptions(2000, UPriority.UPRIORITY_CS5) + +rpc_client : RpcClient = UClient(transport) +#Returns the asyncio Future with the response payload or raises an exception +with the failure reason as UStatus +await rpc_client.invoke_method(method_uri, payload, options) + +---- + +=== Register and handle rpc request +[,python] +---- +transport = # your UTransport instance + +#URI of the method to be invoked +uri= UUri(ue_id=10, ue_version_major=1, resource_id=3) + +#Handler for processing requests +class MyRequestHandler(RequestHandler): + def handle_request(self, message: UMessage) -> UPayload: + # If your processing of the request was successful, you return the response message like + # return UPayload.EMPTY; + # If your processing of the request failed, you can raise a UStatusException passing the + # appropriate UCode and message such as: + # raise UStatusException(UCode.FAILED_PRECONDITION, "Failed to process the request") + # For this example, we will return an empty response + return UPayload.EMPTY + + +rpc_server: RpcServer = UClient(transport) +#Returns the asyncio Future with the response payload or raises an exception +#with the failure reason as UStatus +await rpc_server.register_request_handler(uri, MyRequestHandler()) + +---- + + +=== Send a notification +[,python] +---- +transport = # your UTransport instance + +#Notification topic +uri : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) + +#Destination for the notification +destination_uri : UUri = UUri( ue_id=3, ue_version_major=1) + + +notifier: Notifier = UClient(transport) +# Send the notification (without payload) +await notifier.notify(uri, destination_uri) + +---- + + +=== Registering to receive notifications +[,python] +---- +transport = # your UTransport instance + +#Notification topic +uri : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) + +#Listener to process incoming events on the topic +class MyListener(UListener): + def on_receive(self, umsg: UMessage) -> None: + # Handle receiving notifications here + pass +listener = MyListener() + + +notifier: Notifier = UClient(transport) +# Register listener to recieve notifications +await notifier.registerNotificationListener(uri, listener) + +---- \ No newline at end of file diff --git a/uprotocol/communication/inmemoryrpcclient.py b/uprotocol/communication/inmemoryrpcclient.py index e92b7af..b7720b8 100644 --- a/uprotocol/communication/inmemoryrpcclient.py +++ b/uprotocol/communication/inmemoryrpcclient.py @@ -110,7 +110,9 @@ async def invoke_method( """ if not self.is_listener_registered: # Ensure listener is registered before proceeding - status = await self.transport.register_listener(UriFactory.ANY, self.response_handler, None) + status = await self.transport.register_listener( + UriFactory.ANY, self.response_handler, self.transport.get_source() + ) if status.code != UCode.OK: raise UStatusError.from_code_message(status.code, "Failed to register listener for rpc client") self.is_listener_registered = True @@ -118,44 +120,43 @@ async def invoke_method( builder = UMessageBuilder.request(self.transport.get_source(), method_uri, options.timeout) request = None response_future = asyncio.Future() - try: - if options.token: - builder.with_token(options.token) - - request = builder.build_from_upayload(request_payload) - response_future.add_done_callback(lambda fut: self.cleanup_request(request.attributes.id)) + if options.token: + builder.with_token(options.token) - if UuidSerializer.serialize(request.attributes.id) in self.requests: - raise UStatusError.from_code_message(code=UCode.ALREADY_EXISTS, message="Duplicated request found") - self.requests[UuidSerializer.serialize(request.attributes.id)] = response_future + request = builder.build_from_upayload(request_payload) - async def wait_for_response(): - try: - response_message = await asyncio.wait_for(response_future, timeout=request.attributes.ttl / 1000) - return UPayload.pack_from_data_and_format( - response_message.payload, response_message.attributes.payload_format - ) - except asyncio.TimeoutError: - raise UStatusError.from_code_message(code=UCode.DEADLINE_EXCEEDED, message="Request timed out") - except UStatusError as e: - raise e - except Exception as e: - raise UStatusError.from_code_message(code=UCode.UNKNOWN, message=str(e)) + response_future.add_done_callback(lambda fut: self.cleanup_request(request.attributes.id)) - # Start the task for waiting for the response before sending the request - response_task = asyncio.create_task(wait_for_response()) + if UuidSerializer.serialize(request.attributes.id) in self.requests: + raise UStatusError.from_code_message(code=UCode.ALREADY_EXISTS, message="Duplicated request found") + self.requests[UuidSerializer.serialize(request.attributes.id)] = response_future + ttl = request.attributes.ttl / 1000 # Convert TTL from milliseconds to seconds + try: + # Start sending the request asynchronously status = await self.transport.send(request) - if status.code != UCode.OK: raise UStatusError(status) - # Wait for the response task to complete - return await response_task - except Exception as e: + # Wait for the response within the specified timeout + response_message = await asyncio.wait_for(response_future, timeout=ttl) + return UPayload.pack_from_data_and_format( + response_message.payload, response_message.attributes.payload_format + ) + + except asyncio.TimeoutError: + # If timeout occurs while waiting for response + raise UStatusError.from_code_message(code=UCode.DEADLINE_EXCEEDED, message="Request timed out") + + except UStatusError as e: + # Propagate UStatusError exceptions raise e + finally: + # Clean up request from self.requests + self.cleanup_request(request.attributes.id) + def close(self): """ Close the InMemoryRpcClient by clearing stored requests and unregistering the listener. diff --git a/uprotocol/communication/inmemoryrpcserver.py b/uprotocol/communication/inmemoryrpcserver.py index 8e43366..4e1c229 100644 --- a/uprotocol/communication/inmemoryrpcserver.py +++ b/uprotocol/communication/inmemoryrpcserver.py @@ -85,10 +85,9 @@ async def register_request_handler(self, method_uri: UUri, handler: RequestHandl :param handler: The handler that will process the request for the client. :return: Returns the status of registering the RpcListener. """ - if method_uri is None: - raise ValueError("Method URI missing") - if handler is None: - raise ValueError("Request listener missing") + + if method_uri is None or handler is None: + return UStatus(code=UCode.INVALID_ARGUMENT, message="Method URI or handler missing") try: method_uri_str = UriSerializer().serialize(method_uri) @@ -106,8 +105,6 @@ async def register_request_handler(self, method_uri: UUri, handler: RequestHandl except UStatusError as e: return UStatus(code=e.get_code(), message=e.get_message()) - except Exception as e: - return UStatus(code=UCode.INTERNAL, message=str(e)) async def unregister_request_handler(self, method_uri: UUri, handler: RequestHandler) -> UStatus: """ @@ -117,10 +114,9 @@ async def unregister_request_handler(self, method_uri: UUri, handler: RequestHan :param handler: The handler for processing requests. :return: Returns the status of unregistering the RpcListener. """ - if method_uri is None: - raise ValueError("Method URI missing") - if handler is None: - raise ValueError("Request listener missing") + if method_uri is None or handler is None: + return UStatus(code=UCode.INVALID_ARGUMENT, message="Method URI or handler missing") + method_uri_str = UriSerializer().serialize(method_uri) if self.request_handlers.get(method_uri_str) == handler: diff --git a/uprotocol/communication/inmemorysubscriber.py b/uprotocol/communication/inmemorysubscriber.py index e7b3ffd..3533d74 100644 --- a/uprotocol/communication/inmemorysubscriber.py +++ b/uprotocol/communication/inmemorysubscriber.py @@ -12,51 +12,81 @@ SPDX-License-Identifier: Apache-2.0 """ -import asyncio +from typing import Dict, Optional from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.notifier import Notifier from uprotocol.communication.rpcclient import RpcClient from uprotocol.communication.rpcmapper import RpcMapper from uprotocol.communication.subscriber import Subscriber +from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.communication.upayload import UPayload +from uprotocol.communication.ustatuserror import UStatusError from uprotocol.core.usubscription.v3 import usubscription_pb2 from uprotocol.core.usubscription.v3.usubscription_pb2 import ( SubscriberInfo, SubscriptionRequest, SubscriptionResponse, + SubscriptionStatus, UnsubscribeRequest, UnsubscribeResponse, + Update, ) from uprotocol.transport.ulistener import UListener from uprotocol.transport.utransport import UTransport from uprotocol.uri.factory.uri_factory import UriFactory +from uprotocol.uri.serializer.uriserializer import UriSerializer +from uprotocol.v1.uattributes_pb2 import UMessageType from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus +class MyNotificationListener(UListener): + def __init__(self, handlers): + self.handlers = handlers + + def on_receive(self, message: UMessage) -> None: + """ + Handles incoming notifications from the USubscription service. + + :param message: The notification message from the USubscription service. + """ + if message.attributes.type != UMessageType.UMESSAGE_TYPE_NOTIFICATION: + return + + subscription_update = UPayload.unpack_data_format(message.payload, message.attributes.payload_format, Update) + + if subscription_update: + handler = self.handlers.get(UriSerializer.serialize(subscription_update.topic)) + # Check if we have a handler registered for the subscription change notification + # for the specific topic that triggered the subscription change notification. + # It is possible that the client did not register one initially (i.e., they don't care to receive it). + if handler: + try: + handler.handle_subscription_change(subscription_update.topic, subscription_update.status) + except Exception: + pass + + class InMemorySubscriber(Subscriber): """ - The following is an example implementation of the Subscriber interface that + The following is an in-memory implementation of the Subscriber interface that wraps the UTransport for implementing the Subscriber-side of the pub/sub messaging pattern to allow developers to subscribe and unsubscribe to topics. - This implementation uses the InMemoryRpcClient to send the subscription request - to the uSubscription service. - - NOTE: Developers are not required to use these APIs, they can implement their own - or directly use the UTransport to communicate with the uSubscription - services and register their publish message listener. + This implementation uses the InMemoryRpcClient and SimpleNotifier interfaces + to invoke the subscription request message to the usubscription service, and register + to receive notifications for changes from the uSubscription service. """ - METHOD_SUBSCRIBE = 1 # TODO: Fetch this from proto generated code - METHOD_UNSUBSCRIBE = 2 # TODO: Fetch this from proto generated code - - def __init__(self, transport: UTransport, rpc_client: RpcClient): + def __init__(self, transport: UTransport, rpc_client: RpcClient, notifier: Notifier): """ - Constructor for the DefaultSubscriber. + Creates a new subscriber for existing Communication Layer client implementations. :param transport: The transport to use for sending the notifications :param rpc_client: The RPC client to use for sending the RPC requests + :param notifier: The notifier to register notification listeners """ if not transport: raise ValueError(UTransport.TRANSPORT_NULL_ERROR) @@ -64,83 +94,135 @@ def __init__(self, transport: UTransport, rpc_client: RpcClient): raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) elif not rpc_client: raise ValueError("RpcClient missing") + elif not notifier: + raise ValueError("Notifier missing") self.transport = transport self.rpc_client = rpc_client - - async def subscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> SubscriptionResponse: + self.notifier = notifier + self.handlers: Dict[str, SubscriptionChangeHandler] = {} + self.notification_handler: UListener = MyNotificationListener(self.handlers) + self.is_listener_registered = False + service_descriptor = usubscription_pb2.DESCRIPTOR.services_by_name["uSubscription"] + self.notification_uri = UriFactory.from_proto(service_descriptor, 0x8000) + self.subscribe_uri = UriFactory.from_proto(service_descriptor, 1) + self.unsubscribe_uri = UriFactory.from_proto(service_descriptor, 2) + + async def subscribe( + self, + topic: UUri, + listener: UListener, + options: CallOptions = None, + handler: Optional[SubscriptionChangeHandler] = None, + ) -> SubscriptionResponse: """ - Subscribe to a given topic. + Subscribes to a given topic. - The API will return a future with the response SubscriptionResponse or exception - with the failure if the subscription was not successful. The API will also register the listener to be - called when messages are received. + This method subscribes to the specified topic and returns an async operation (typically a coroutine) that + yields a SubscriptionResponse upon successful subscription or raises an exception if the subscription fails. + The optional handler parameter, if provided, handles notifications of changes in subscription states, + such as from SubscriptionStatus.State.SUBSCRIBE_PENDING to SubscriptionStatus.State.SUBSCRIBED, which occurs + when we subscribe to remote topics that the device we are on has not yet a subscriber that has subscribed + to said topic. + + NOTE: Calling this method multiple times with different handlers will result in UCode.ALREADY_EXISTS being + returned. :param topic: The topic to subscribe to. - :param listener: The listener to be called when a message is received on the topic. - :param options: The call options for the subscription. - :return: Returns the future with the response SubscriptionResponse or - exception with the failure reason as UStatus. + :param listener: The listener function to be called when a message is received on the topic. + :param options: Optional call options for the subscription. + :param handler: Optional handler function for handling subscription state changes. + :return: An async operation that yields a SubscriptionResponse upon success or raises an exception with + the failure reason as UStatus. UCode.ALREADY_EXISTS will be returned if called multiple times + with different handlers. """ if not topic: raise ValueError("Subscribe topic missing") if not listener: raise ValueError("Request listener missing") - service_descriptor = usubscription_pb2.DESCRIPTOR.services_by_name["uSubscription"] - - subscribe_uri = UriFactory.from_proto(service_descriptor, self.METHOD_SUBSCRIBE, None) - request = SubscriptionRequest(topic=topic, subscriber=SubscriberInfo(uri=self.transport.get_source())) - future_result = asyncio.ensure_future( - self.rpc_client.invoke_method(subscribe_uri, UPayload.pack(request), options) - ) - response_future = RpcMapper.map_response(future_result, SubscriptionResponse) + if not self.is_listener_registered: + # Ensure listener is registered before proceeding + status = await self.notifier.register_notification_listener( + self.notification_uri, self.notification_handler + ) + if status.code != UCode.OK: + raise UStatusError.from_code_message(status.code, "Failed to register listener for rpc client") + self.is_listener_registered = True - response = await response_future - await self.transport.register_listener(topic, listener) + request = SubscriptionRequest(topic=topic, subscriber=SubscriberInfo(uri=self.transport.get_source())) + # Send the subscription request and handle the response + + future_result = self.rpc_client.invoke_method(self.subscribe_uri, UPayload.pack(request), options) + + response = await RpcMapper.map_response(future_result, SubscriptionResponse) + if ( + response.status.state == SubscriptionStatus.State.SUBSCRIBED + or response.status.state == SubscriptionStatus.State.SUBSCRIBE_PENDING + ): + # If registering the listener fails, we end up in a situation where we have + # successfully (logically) subscribed to the topic via the USubscription service, + # but we have not been able to register the listener with the local transport. + # This means that events might start getting forwarded to the local authority + # but are not being consumed. Apart from this inefficiency, this does not pose + # a real problem. Since we return a failed future, the client might be inclined + # to try again and (eventually) succeed in registering the listener as well. + await self.transport.register_listener(topic, listener) + + if handler: + topic_str = UriSerializer.serialize(topic) + if topic_str in self.handlers and self.handlers[topic_str] != handler: + raise UStatusError.from_code_message(UCode.ALREADY_EXISTS, "Handler already registered") + self.handlers[topic_str] = handler return response - async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> UStatus: + async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions = None) -> UStatus: """ - Unsubscribe to a given topic. + Unsubscribes from a given topic. - The subscriber no longer wishes to be subscribed to said topic so we issue an unsubscribe - request to the USubscription service. + This method unsubscribes from the specified topic. It sends an unsubscribe request to the USubscription service + and returns an async operation (typically a coroutine) that yields a UStatus indicating the result of the + unsubscribe operation. If the unsubscribe operation fails with the USubscription service, the listener and + handler (if any) will remain registered. - :param topic: The topic to unsubscribe to. - :param listener: The listener to be called when a message is received on the topic. - :param options: The call options for the subscription. - :return: Returns UStatus with the result from the unsubscribe request. + :param topic: The topic to unsubscribe from. + :param listener: The listener function associated with the topic. + :param options: Optional call options for the subscription. + :return: An async operation that yields a UStatus indicating the result of the unsubscribe request. """ if not topic: raise ValueError("Unsubscribe topic missing") if not listener: raise ValueError("Listener missing") - service_descriptor = usubscription_pb2.DESCRIPTOR.services_by_name["uSubscription"] - unsubscribe_uri = UriFactory.from_proto(service_descriptor, self.METHOD_UNSUBSCRIBE, None) unsubscribe_request = UnsubscribeRequest(topic=topic) - future_result = asyncio.ensure_future( - self.rpc_client.invoke_method(unsubscribe_uri, UPayload.pack(unsubscribe_request), options) - ) - response_future = RpcMapper.map_response_to_result(future_result, UnsubscribeResponse) - response = await response_future + future_result = self.rpc_client.invoke_method(self.unsubscribe_uri, UPayload.pack(unsubscribe_request), options) + + response = await RpcMapper.map_response_to_result(future_result, UnsubscribeResponse) if response.is_success(): - await self.transport.unregister_listener(topic, listener) - return UStatus(code=UCode.OK) + self.handlers.pop(UriSerializer.serialize(topic), None) + return await self.transport.unregister_listener(topic, listener) return response.failure_value() async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: """ - Unregister a listener from a topic. + Unregisters a listener and removes any registered SubscriptionChangeHandler for the topic. - This method will only unregister the listener for a given subscription thus allowing a uE to stay - subscribed even if the listener is removed. + This method removes the specified listener and any associated SubscriptionChangeHandler without + notifying the uSubscription service. This allows persistent subscription even when the uE (micro E) + is not running. - :param topic: The topic to subscribe to. - :param listener: The listener to be called when a message is received on the topic. - :return: Returns UStatus with the status of the listener unregister request. + :param topic: The topic to unregister from. + :param listener: The listener function associated with the topic. + :return: An async operation that yields a UStatus indicating the status of the listener unregister request. """ if not topic: raise ValueError("Unsubscribe topic missing") if not listener: raise ValueError("Request listener missing") return await self.transport.unregister_listener(topic, listener) + + def close(self): + """ + Close the InMemoryRpcClient by clearing stored requests and unregistering the listener. + """ + self.handlers.clear() + self.notifier.unregister_notification_listener(self.notification_uri, self.notification_handler) diff --git a/uprotocol/communication/rpcmapper.py b/uprotocol/communication/rpcmapper.py index 281d840..2896197 100644 --- a/uprotocol/communication/rpcmapper.py +++ b/uprotocol/communication/rpcmapper.py @@ -12,7 +12,7 @@ SPDX-License-Identifier: Apache-2.0 """ -import asyncio +from typing import Coroutine from uprotocol.communication.rpcresult import RpcResult from uprotocol.communication.upayload import UPayload @@ -29,7 +29,7 @@ class RpcMapper: """ @staticmethod - async def map_response(response_coro: asyncio.Future, expected_cls): + async def map_response(response_coro: Coroutine, expected_cls): """ Map a response from invoking a method on a uTransport service into a result containing the declared expected return type of the RPC method. @@ -39,22 +39,25 @@ async def map_response(response_coro: asyncio.Future, expected_cls): :return: Returns the declared expected return type of the RPC method or raises an exception. """ try: - payload = await response_coro + response = await response_coro except Exception as e: raise RuntimeError(f"Unexpected exception: {str(e)}") from e - - if payload is not None: - if not payload.data: + if isinstance(response, UStatusError): + raise response + if isinstance(response, Exception): + raise response + if response is not None: + if not response.data: return expected_cls() else: - result = UPayload.unpack(payload, expected_cls) + result = UPayload.unpack(response, expected_cls) if result: return result raise RuntimeError(f"Unknown payload. Expected [{expected_cls.__name__}]") @staticmethod - async def map_response_to_result(response_coro: asyncio.Future, expected_cls) -> RpcResult: + async def map_response_to_result(response_coro: Coroutine, expected_cls) -> RpcResult: """ Map a response from method invocation to an RpcResult containing the declared expected return type of the RPC method. @@ -70,20 +73,19 @@ async def map_response_to_result(response_coro: asyncio.Future, expected_cls) -> :raises: Raises appropriate exceptions if there is an error during response handling. """ try: - payload = await response_coro + response = await response_coro except Exception as e: if isinstance(e, UStatusError): return RpcResult.failure(value=e.status) - elif isinstance(e, asyncio.TimeoutError): - return RpcResult.failure(code=UCode.DEADLINE_EXCEEDED, message="Request timed out") else: return RpcResult.failure(code=UCode.INVALID_ARGUMENT, message=str(e)) - - if payload is not None: - if not payload.data: + if isinstance(response, UStatusError): + return RpcResult.failure(value=response.status) + if response is not None: + if not response.data: return RpcResult.success(expected_cls()) else: - result = UPayload.unpack(payload, expected_cls) + result = UPayload.unpack(response, expected_cls) return RpcResult.success(result) exception = RuntimeError(f"Unknown or null payload type. Expected [{expected_cls.__name__}]") diff --git a/uprotocol/communication/rpcresult.py b/uprotocol/communication/rpcresult.py index 2746d46..adb9d36 100644 --- a/uprotocol/communication/rpcresult.py +++ b/uprotocol/communication/rpcresult.py @@ -13,7 +13,7 @@ """ from abc import ABC, abstractmethod -from typing import TypeVar, Union +from typing import TypeVar from uprotocol.v1.ucode_pb2 import UCode from uprotocol.v1.ustatus_pb2 import UStatus @@ -49,11 +49,7 @@ def success(value: T) -> "RpcResult": @staticmethod def failure( - value: Union[ - UStatus, - "Failure", - Exception, - ] = None, + value: UStatus = None, code: UCode = UCode.UNKNOWN, message: str = "", ) -> "RpcResult": @@ -83,16 +79,12 @@ def __str__(self) -> str: class Failure(RpcResult): def __init__( self, - value: Union[UStatus, "Failure", Exception, None] = None, + value: UStatus = None, code: UCode = UCode.UNKNOWN, message: str = "", ): if isinstance(value, UStatus): self.value = value - elif isinstance(value, Exception): - self.value = UStatus(code=code, message=str(value)) - elif isinstance(value, Failure): - self.value = value.failure_value() else: self.value = UStatus(code=code, message=message) diff --git a/uprotocol/communication/subscriber.py b/uprotocol/communication/subscriber.py index 5f0040d..e1e19f4 100644 --- a/uprotocol/communication/subscriber.py +++ b/uprotocol/communication/subscriber.py @@ -13,8 +13,10 @@ """ from abc import ABC, abstractmethod +from typing import Optional from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.core.usubscription.v3.usubscription_pb2 import ( SubscriptionResponse, ) @@ -31,21 +33,39 @@ class Subscriber(ABC): """ @abstractmethod - async def subscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> SubscriptionResponse: + async def subscribe( + self, + topic: UUri, + listener: UListener, + options: Optional[CallOptions] = None, + handler: Optional[SubscriptionChangeHandler] = None, + ) -> SubscriptionResponse: """ - Subscribe to a given topic asynchronously. + Subscribes to a given topic asynchronously. + + The API will return a SubscriptionResponse or raise an exception if the subscription fails. + It registers the listener to be called when messages are received and allows the caller to register + a SubscriptionChangeHandler that is called whenever the subscription state changes + (e.g., SubscriptionStatus.State.PENDING_SUBSCRIBED to SubscriptionStatus.State.SUBSCRIBED, + SubscriptionStatus.State.SUBSCRIBED to SubscriptionStatus.State.UNSUBSCRIBED, etc.). :param topic: The topic to subscribe to. - :param listener: The listener to be called when a message is received on the topic. - :param options: The call options for the subscription. - :return: Returns the SubscriptionResponse upon successful subscription + :param listener: The UListener that is called when published messages are received. + :param options: The CallOptions to provide additional information (timeout, token, etc.). + :param handler: SubscriptionChangeHandler to handle changes to subscription states. + :return: Returns the SubscriptionResponse or raises an exception with the failure reason as UStatus. """ pass @abstractmethod - async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> UStatus: + async def unsubscribe( + self, topic: UUri, listener: UListener, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> UStatus: """ - Unsubscribe to a given topic asynchronously. + Unsubscribes from a given topic. + + The subscriber no longer wishes to be subscribed to the specified topic, trigger an unsubscribe + request to the USubscription service. :param topic: The topic to unsubscribe to. :param listener: The listener to be called when a message is received on the topic. @@ -57,7 +77,10 @@ async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptio @abstractmethod async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: """ - Unregister a listener from a topic asynchronously. + Unregisters a listener from a topic asynchronously. + + This method will only unregister the listener for a given subscription, allowing the uE to remain subscribed + even if the listener is removed. :param topic: The topic to subscribe to. :param listener: The listener to be called when a message is received on the topic. diff --git a/uprotocol/communication/subscriptionchangehandler.py b/uprotocol/communication/subscriptionchangehandler.py new file mode 100644 index 0000000..0e4d6f8 --- /dev/null +++ b/uprotocol/communication/subscriptionchangehandler.py @@ -0,0 +1,38 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod + +from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + SubscriptionStatus, +) +from uprotocol.v1.uri_pb2 import UUri + + +class SubscriptionChangeHandler(ABC): + """ + Communication Layer (uP-L2) Subscription Change Handler interface. + + This interface provides APIs to handle subscription state changes for a given topic. + """ + + @abstractmethod + def handle_subscription_change(self, topic: UUri, status: SubscriptionStatus) -> None: + """ + Method called to handle/process subscription state changes for a given topic. + + :param topic: The topic that the subscription state changed for. + :param status: The new status of the subscription. + """ + pass diff --git a/uprotocol/communication/uclient.py b/uprotocol/communication/uclient.py index 7e322dc..7bbc7d3 100644 --- a/uprotocol/communication/uclient.py +++ b/uprotocol/communication/uclient.py @@ -25,6 +25,7 @@ from uprotocol.communication.simplenotifier import SimpleNotifier from uprotocol.communication.simplepublisher import SimplePublisher from uprotocol.communication.subscriber import Subscriber +from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.communication.upayload import UPayload from uprotocol.core.usubscription.v3.usubscription_pb2 import ( SubscriptionResponse, @@ -45,10 +46,10 @@ class UClient(RpcServer, Subscriber, Notifier, Publisher, RpcClient): Attributes: transport (UTransport): The underlying transport mechanism. - rpcServer (InMemoryRpcServer): Handles incoming RPC requests. + rpc_server (InMemoryRpcServer): Handles incoming RPC requests. publisher (SimplePublisher): Sends messages to topics. notifier (SimpleNotifier): Sends notifications to destinations. - rpcClient (InMemoryRpcClient): Invokes remote methods. + rpc_client (InMemoryRpcClient): Invokes remote methods. subscriber (InMemorySubscriber): Manages topic subscriptions. """ @@ -59,33 +60,42 @@ def __init__(self, transport: UTransport): elif not isinstance(transport, UTransport): raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) - self.rpcServer = InMemoryRpcServer(self.transport) + self.rpc_server = InMemoryRpcServer(self.transport) self.publisher = SimplePublisher(self.transport) self.notifier = SimpleNotifier(self.transport) - self.rpcClient = InMemoryRpcClient(self.transport) - self.subscriber = InMemorySubscriber(self.transport, self.rpcClient) + self.rpc_client = InMemoryRpcClient(self.transport) + self.subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) - async def subscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> SubscriptionResponse: + async def subscribe( + self, + topic: UUri, + listener: UListener, + options: Optional[CallOptions] = None, + handler: Optional[SubscriptionChangeHandler] = None, + ) -> SubscriptionResponse: """ - Subscribe to a given topic. + Subscribe to a given topic asynchronously. - The API will return a future with the response SubscriptionResponse or exception - with the failure if the subscription was not successful. The API will also register the listener to be - called when messages are received. + The API will return a SubscriptionResponse or raise an exception if the subscription fails. + It registers the listener to be called when messages are received and allows the caller to register + a SubscriptionChangeHandler that is called whenever the subscription state changes (e.g., PENDING_SUBSCRIBED to + SUBSCRIBED, SUBSCRIBED to UNSUBSCRIBED, etc.). :param topic: The topic to subscribe to. - :param listener: The listener to be called when a message is received on the topic. - :param options: The call options for the subscription. - :return: Returns the future with the response SubscriptionResponse or - exception with the failure reason as UStatus. + :param listener: The UListener that is called when published messages are received. + :param options: The CallOptions to provide additional information (timeout, token, etc.). + :param handler: SubscriptionChangeHandler to handle changes to subscription states. + :return: Returns the SubscriptionResponse or raises an exception with the failure reason as UStatus. """ - return await self.subscriber.subscribe(topic, listener, options) + return await self.subscriber.subscribe(topic, listener, options, handler) - async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> UStatus: + async def unsubscribe( + self, topic: UUri, listener: UListener, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> UStatus: """ - Unsubscribe to a given topic. + Unsubscribe to a given topic asynchronously. - The subscriber no longer wishes to be subscribed to said topic so we issue an unsubscribe + The subscriber no longer wishes to be subscribed to the specified topic, trigger an unsubscribe request to the USubscription service. :param topic: The topic to unsubscribe to. @@ -155,7 +165,7 @@ async def publish( """ return await self.publisher.publish(topic, options, payload) - async def register_request_handler(self, method: UUri, handler): + async def register_request_handler(self, method_uri: UUri, handler): """ Register a handler that will be invoked when requests come in from clients for the given method. @@ -165,9 +175,9 @@ async def register_request_handler(self, method: UUri, handler): :param handler: The handler that will process the request for the client. :return: Returns the status of registering the RpcListener. """ - return await self.rpcServer.register_request_handler(method, handler) + return await self.rpc_server.register_request_handler(method_uri, handler) - async def unregister_request_handler(self, method: UUri, handler): + async def unregister_request_handler(self, method_uri: UUri, handler): """ Unregister a handler that will be invoked when requests come in from clients for the given method. @@ -175,7 +185,7 @@ async def unregister_request_handler(self, method: UUri, handler): :param handler: The handler for processing requests. :return: Returns the status of unregistering the RpcListener. """ - return await self.rpcServer.unregister_request_handler(method, handler) + return await self.rpc_server.unregister_request_handler(method_uri, handler) async def invoke_method( self, method_uri: UUri, request_payload: UPayload, options: Optional[CallOptions] = None @@ -192,4 +202,10 @@ async def invoke_method( :return: Returns the asyncio Future with the response payload or raises an exception with the failure reason as UStatus. """ - return await self.rpcClient.invoke_method(method_uri, request_payload, options) + return await self.rpc_client.invoke_method(method_uri, request_payload, options) + + def close(self): + if self.rpc_client: + self.rpc_client.close() + if self.subscriber: + self.subscriber.close() diff --git a/uprotocol/communication/upayload.py b/uprotocol/communication/upayload.py index fef5d15..45feba0 100644 --- a/uprotocol/communication/upayload.py +++ b/uprotocol/communication/upayload.py @@ -86,14 +86,3 @@ def unpack_data_format( # Initialize EMPTY outside the class definition UPayload.EMPTY = UPayload(data=bytes(), format=UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED) - -# Example usage: -if __name__ == "__main__": - from google.protobuf.wrappers_pb2 import Int32Value # Import Int32Value from Google protobuf wrappers - - # Create an instance of Int32Value - int_value = Int32Value(value=42) - - packed_int = UPayload.pack(int_value) - unpacked_int = UPayload.unpack(packed_int, Int32Value) - print("Unpacked Int32Value:", unpacked_int) diff --git a/uprotocol/transport/README.adoc b/uprotocol/transport/README.adoc index 3f2c834..1c7716e 100644 --- a/uprotocol/transport/README.adoc +++ b/uprotocol/transport/README.adoc @@ -4,4 +4,101 @@ :source-highlighter: prettify == Overview -The purpose of this module is to provide the Python implementation of https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/up-l1/README.adoc[uTransport API & Data Model]. The transport API is used by all uE developers to send and receive messages across any transport. The interface is to be implemented by communication transport developers (i.e. developing a uTransport for SOME/IP, DDS, Zenoh, MQTT, etc...). +The following section implements https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/up-l1/README.adoc[uP-L1 Transport Layer Specifications]. The purpose of the transport layer is to wrap communication middlewares into a common interface that allows us to send and receive any kind of uProtocol messages (publish, request, response, notification) over any kind of transport (HTTP, MQTT, WebSockets, etc). + +the datamodel is declared in the uProtocol specifications project in the https://github.com/eclipse-uprotocol/up-spec/tree/main/up-core-api[up-core-api] folder and this project declares the language specific transport interface (UListener & UTransport) and the builders, serializers, and validators, for the up-core-api data model. . + +Below are the list of the classes and interfaces that are part of the uProtocol Transport Interface & Data Model: + +.Transport Interfaces +[table-transport-interfaces, cols="1,3"] +|=== +| Class/Interface | Description + +| xref:utransport.py[*`UTransport`*] +| Interface that defines the methods that a transport middleware must implement in order to be used by the uProtocol library. + +| xref:ulistener.py[*`UListener`*] +| Callback/listener interface to be able to receive messages from a transport. + +| xref:builder/umessagebuilder.py[*`UMessageBuilder`*] +| Interface that simply builds request, response, publish, and defines the methods that a message builder must implement in order to be used by the uProtocol library. + +| xref:validator/uattributesvalidator.py[*`UAttributesValidator`*] +| uProtocol Attributes validator that ensures that the publish, notification, request, and response messages are built with the correct information. + +|=== + +== Examples + +=== Create Transport + +[,python] +---- +class HappyUTransport(UTransport): + async def close(self) -> None: + pass + + async def send(self, message): + return UStatus(code=UCode.INVALID_ARGUMENT if message is None else UCode.OK) + + async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + listener.on_receive(UMessage()) + return UStatus(code=UCode.OK) + + async def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): + return UStatus(code=UCode.OK) + + def get_source(self): + return UUri() +---- + +=== Build Messages using UMessageBuilder + +==== Build Publish Message +[,python] +---- +topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) +UMessageBuilder.publish(topic).build() + +# to add payload, pack your proto message in UPayload +# to pack UUri() +UMessageBuilder.publish(topic).build_from_upayload(UPayload.pack(UUri())) + +---- +==== Build Notification Message +[,python] +---- +topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) +destination_uri : UUri = UUri( ue_id=3, ue_version_major=1) + +UMessageBuilder.notification(topic, destination).build() + +---- +==== Build Request Message +[,python] +---- +source : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) +sink : UUri = UUri( ue_id=3, ue_version_major=1) +#1000 is ttl here +UMessageBuilder.request(source, sink, 1000).build() + +---- + +==== Build Response Message +[,python] +---- +source : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) +sink : UUri = UUri( ue_id=3, ue_version_major=1) +reqid : UUID = Factories.UPROTOCOL.create() +UMessageBuilder.response(source, sink, reqid).build() + +---- + +==== Build Response Message from Request +[,python] +---- + +UMessageBuilder.response_for_request(request.attributes).build() + +---- \ No newline at end of file diff --git a/uprotocol/uri/README.adoc b/uprotocol/uri/README.adoc index 191c9cf..b23d434 100644 --- a/uprotocol/uri/README.adoc +++ b/uprotocol/uri/README.adoc @@ -5,29 +5,41 @@ == Overview -The following folder contains everything but the data model for UUri (builders, serializers, validators, etc...) per https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/basics/uri.adoc[uProtocol URI Specifications]. -The data model is defined in https://github.com/eclipse-uprotocol/uprotocol-core-api/blob/main/src/main/proto/uri.proto[uri.proto] and included as a dependency for this project. +The following folder contains the data model for UUri (factory, serializer, validator, etc...) as per the https://github.com/eclipse-uprotocol/up-spec/blob/main/basics/uri.adoc[uProtocol URI Specifications]. +The data model is defined in https://github.com/eclipse-uprotocol/up-spec/blob/main/up-core-api/uprotocol/v1/uri.proto[uri.proto] and included as a dependency for this project. -IMPORTANT: For more details about the data model, various formats (object, long, micro) and their uses, please refer to https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/basics/uri.adoc[uProtocol URI Specifications]. -== Using the SDK - -When building UUri, you can choose to populate it with only names, only numbers, or both (resolved). When you should use each is described the best practice section of https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/basics/uri.adoc[uProtocol URI Specifications]. +== Create uri from proto +[,python] +---- +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.uri.factory.uri_factory import UriFactory +from uprotocol.core.usubscription.v3 import usubscription_pb2 -=== Building an RPC Method +service_descriptor = usubscription_pb2.DESCRIPTOR.services_by_name["uSubscription"] +uri = UriFactory.from_proto(service_descriptor, 0) +---- +== Serialize and Deserialize uri [,python] ---- +from uprotocol.v1.uri_pb2 import UUri uri: UUri = UUri( - authority=UAuthority(name="MyDevice", ip=bytes(socket.inet_pton(socket.AF_INET, "192.168.1.100")), - id=bytes.fromhex("3GTU2NEC8HG403825")), - entity=UEntity(name="HartleyService", id=10203, version_major=1), - resource=UResourceBuilder.for_rpc_request("Raise", 10)) ----- + authority_name = "MyDevice", + ue_id = "0x1234", + ue_version_major = "1", + resource_id = "0x5010" + ) +serialized_uri : str = UriSerializer.serialize(uri); +deserialized_uri : UUri = UriSerializer.deserialize(serialized_uri); +self.assertEqual(uri, deserialized_uri) -=== Validating -[,python] ---- -status : ValidationResult = UriValidator.validate_rpc_method(uuri) -assertTrue(status.is_success()); +== Validating a UUri +[,python] ---- +from uprotocol.uri.validator.urivalidator import UriValidator +uri = UUri(resource_id=0x7FFF) +status: bool = UriValidator.is_rpc_method(uri) +assertTrue(status) +---- \ No newline at end of file diff --git a/uprotocol/uri/factory/uri_factory.py b/uprotocol/uri/factory/uri_factory.py index 51aee4e..6f53368 100644 --- a/uprotocol/uri/factory/uri_factory.py +++ b/uprotocol/uri/factory/uri_factory.py @@ -41,7 +41,7 @@ class UriFactory: @staticmethod def from_proto( - service_descriptor: Optional[ServiceDescriptor], resource_id: int, authority_name: Optional[str] + service_descriptor: Optional[ServiceDescriptor], resource_id: int, authority_name: Optional[str] = None ) -> UUri: """ Builds a URI for a protobuf generated code Service Descriptor. diff --git a/uprotocol/uri/validator/urivalidator.py b/uprotocol/uri/validator/urivalidator.py index 9d38333..3edf75b 100644 --- a/uprotocol/uri/validator/urivalidator.py +++ b/uprotocol/uri/validator/urivalidator.py @@ -199,3 +199,21 @@ def matches(uri_to_match: UUri, candidate_uri: UUri) -> bool: and UriValidator.matches_entity(uri_to_match, candidate_uri) and UriValidator.matches_resource(uri_to_match, candidate_uri) ) + + @staticmethod + def has_wildcard(uri): + """ + Checks if the URI has a wildcard in any of its fields. + + :param uri: The URI to check for wildcards. + :type uri: UUri + + :return: True if the URI has a wildcard, False otherwise. + :rtype: bool + """ + return uri and ( + uri.authority_name == UriFactory.WILDCARD_AUTHORITY + or (uri.ue_id & UriFactory.WILDCARD_ENTITY_ID) == UriFactory.WILDCARD_ENTITY_ID + or uri.ue_version_major == UriFactory.WILDCARD_ENTITY_VERSION + or uri.resource_id == UriFactory.WILDCARD_RESOURCE_ID + ) diff --git a/uprotocol/uuid/README.adoc b/uprotocol/uuid/README.adoc index 66cbf7c..90cc670 100644 --- a/uprotocol/uuid/README.adoc +++ b/uprotocol/uuid/README.adoc @@ -4,30 +4,25 @@ == Overview -Implementation of https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/basics/uuid.adoc[uProtocol UUID specifications]. +Implementation of https://github.com/eclipse-uprotocol/up-spec/blob/main/basics/uuid.adoc[uProtocol UUID specifications]. == Examples [source,python] ---- uuid = Factories.UPROTOCOL.create() - version = UUIDUtils.getVersion(uuid) - time = UUIDUtils.getTime(uuid) - bytes_uuid = MicroUuidSerializer.instance().serialize(uuid) + version = UUIDUtils.get_version(uuid) + time = UUIDUtils.get_time(uuid) str_uuid = UuidSerializer.serialize(uuid) - assertTrue(UUIDUtils.isUProtocol(uuid)) - assertTrue(UUIDUtils.isuuid(uuid)) - assertFalse(UUIDUtils.isUuidv6(uuid)) + assertTrue(UUIDUtils.is_uprotocol(uuid)) + assertTrue(UUIDUtils.is_uuid(uuid)) + assertFalse(UUIDUtils.is_uuidv6(uuid)) assertTrue(version) assertTrue(time) - assertGreater(len(bytes_uuid), 0) assertFalse(str_uuid.isspace()) - uuid1 = MicroUuidSerializer.instance().deserialize(bytes_data) - uuid2 = UuidSerializer.deserialize(uuid_string) + uuid1 = UuidSerializer.deserialize(uuid_string) assertNotEqual(uuid1, UUID()) - assertNotEqual(uuid2, UUID()) assertEqual(uuid, uuid1) - assertEqual(uuid, uuid2) ---- \ No newline at end of file