diff --git a/tests/test_communication/mock_utransport.py b/tests/test_communication/mock_utransport.py index 70d9d4e..ac48a35 100644 --- a/tests/test_communication/mock_utransport.py +++ b/tests/test_communication/mock_utransport.py @@ -12,6 +12,7 @@ SPDX-License-Identifier: Apache-2.0 """ +import asyncio import threading from abc import ABC from concurrent.futures import ThreadPoolExecutor @@ -35,6 +36,8 @@ from uprotocol.validation.validationresult import ValidationResult +# ToDo Change the implementation of transport APIs to use the URI match pattern and save listeners +# against the source and sink filter tuple. class MockUTransport(UTransport): def get_source(self) -> UUri: return self.source @@ -53,7 +56,7 @@ def build_response(self, request: UMessage): def close(self): self.listeners.clear() - def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: with self.lock: if sink_filter is not None: # method uri topic = UriSerializer().serialize(sink_filter) @@ -65,7 +68,7 @@ def register_listener(self, source_filter: UUri, listener: UListener, sink_filte self.listeners[topic].append(listener) return UStatus(code=UCode.OK) - def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: + async def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: with self.lock: if sink is not None: # method uri topic = UriSerializer().serialize(sink) @@ -82,57 +85,74 @@ def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = No result = UStatus(code=code) return result - def send(self, message: UMessage) -> UStatus: + async def send(self, message: UMessage) -> UStatus: validator = UAttributesValidator.get_validator(message.attributes) with self.lock: if message is None or validator.validate(message.attributes) != ValidationResult.success(): return UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid message attributes") - executor = ThreadPoolExecutor(max_workers=5) - executor.submit(self._notify_listeners, message) + # Use a ThreadPoolExecutor with max_workers=1 + executor = ThreadPoolExecutor(max_workers=1) + + try: + # Submit _notify_listeners to the executor + future = executor.submit(self._notify_listeners, message) + + # Await completion of the Future + await asyncio.wrap_future(future) + + finally: + # Clean up the executor + executor.shutdown() return UStatus(code=UCode.OK) def _notify_listeners(self, umsg): - if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_PUBLISH: - for key, listeners in self.listeners.items(): - uri = UriSerializer().deserialize(key) - if not (UriValidator.is_rpc_method(uri) or UriValidator.is_rpc_response(uri)): - for listener in listeners: - listener.on_receive(umsg) - - else: - if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: - serialized_uri = UriSerializer().serialize(umsg.attributes.sink) - if serialized_uri not in self.listeners: - # no listener registered for method uri, send dummy response. - # This case will only come for request type - # as for response type, there will always be response handler as it is in up client - serialized_uri = UriSerializer().serialize(UriFactory.ANY) - umsg = self.build_response(umsg) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_PUBLISH: + for key, listeners in self.listeners.items(): + uri = UriSerializer().deserialize(key) + if not (UriValidator.is_rpc_method(uri) or UriValidator.is_rpc_response(uri)): + for listener in listeners: + loop.call_soon_threadsafe(listener.on_receive, umsg) + else: - # this is for response type,handle response - serialized_uri = UriSerializer().serialize(UriFactory.ANY) + if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: + serialized_uri = UriSerializer().serialize(umsg.attributes.sink) + if serialized_uri not in self.listeners: + # no listener registered for method uri, send dummy response. + # This case will only come for request type + # as for response type, there will always be response handler as it is in up client + serialized_uri = UriSerializer().serialize(UriFactory.ANY) + umsg = self.build_response(umsg) + else: + # this is for response type,handle response + serialized_uri = UriSerializer().serialize(UriFactory.ANY) - if serialized_uri in self.listeners: - for listener in self.listeners[serialized_uri]: - listener.on_receive(umsg) - break # as there will be only one listener for method uri + if serialized_uri in self.listeners: + for listener in self.listeners[serialized_uri]: + loop.call_soon_threadsafe(listener.on_receive, umsg) + break # as there will be only one listener for method uri + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() class TimeoutUTransport(MockUTransport, ABC): - def send(self, message): + async def send(self, message): return UStatus(code=UCode.OK) class ErrorUTransport(MockUTransport, ABC): - def send(self, message): + async def send(self, message): return UStatus(code=UCode.FAILED_PRECONDITION) - def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: return UStatus(code=UCode.FAILED_PRECONDITION) - def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: + async def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: return UStatus(code=UCode.FAILED_PRECONDITION) @@ -150,7 +170,7 @@ class EchoUTransport(MockUTransport): def build_response(self, request): return request - def send(self, message): + async def send(self, message): response = self.build_response(message) executor = ThreadPoolExecutor(max_workers=1) executor.submit(self._notify_listeners, response) diff --git a/tests/test_communication/test_inmemoryrpcclient.py b/tests/test_communication/test_inmemoryrpcclient.py index 7716e58..a8836df 100644 --- a/tests/test_communication/test_inmemoryrpcclient.py +++ b/tests/test_communication/test_inmemoryrpcclient.py @@ -35,6 +35,7 @@ async def test_invoke_method_with_payload(self): rpc_client = InMemoryRpcClient(MockUTransport()) response = await rpc_client.invoke_method(self.create_method_uri(), payload, None) self.assertIsNotNone(response) + self.assertEqual(response, payload) async def test_invoke_method_with_payload_and_call_options(self): payload = UPayload.pack_to_any(UUri()) @@ -42,11 +43,13 @@ async def test_invoke_method_with_payload_and_call_options(self): rpc_client = InMemoryRpcClient(MockUTransport()) response = await rpc_client.invoke_method(self.create_method_uri(), payload, options) self.assertIsNotNone(response) + self.assertEqual(response, payload) async def test_invoke_method_with_null_payload(self): rpc_client = InMemoryRpcClient(MockUTransport()) response = await rpc_client.invoke_method(self.create_method_uri(), None, CallOptions.DEFAULT) self.assertIsNotNone(response) + self.assertEqual(response, UPayload.EMPTY) async def test_invoke_method_with_timeout_transport(self): payload = UPayload.pack_to_any(UUri()) @@ -64,7 +67,8 @@ async def test_invoke_method_with_multi_invoke_transport(self): response2 = await rpc_client.invoke_method(self.create_method_uri(), payload, None) self.assertIsNotNone(response1) self.assertIsNotNone(response2) - self.assertEqual(response1, response2) + self.assertEqual(payload, response1) + self.assertEqual(payload, response2) async def test_close_with_multiple_listeners(self): rpc_client = InMemoryRpcClient(MockUTransport()) @@ -74,7 +78,8 @@ async def test_close_with_multiple_listeners(self): response2 = await rpc_client.invoke_method(self.create_method_uri(), payload, None) self.assertIsNotNone(response1) self.assertIsNotNone(response2) - self.assertEqual(response1, response2) + self.assertEqual(payload, response1) + self.assertEqual(payload, response2) rpc_client.close() async def test_invoke_method_with_comm_status_transport(self): @@ -87,7 +92,7 @@ async def test_invoke_method_with_comm_status_transport(self): async def test_invoke_method_with_error_transport(self): class ErrorUTransport(MockUTransport): - def send(self, message): + async def send(self, message): return UStatus(code=UCode.FAILED_PRECONDITION) rpc_client = InMemoryRpcClient(ErrorUTransport()) diff --git a/tests/test_communication/test_inmemoryrpcserver.py b/tests/test_communication/test_inmemoryrpcserver.py index 7a6c4e1..96354fd 100644 --- a/tests/test_communication/test_inmemoryrpcserver.py +++ b/tests/test_communication/test_inmemoryrpcserver.py @@ -33,7 +33,7 @@ from uprotocol.v1.ustatus_pb2 import UStatus -class TestInMemoryRpcServer(unittest.TestCase): +class TestInMemoryRpcServer(unittest.IsolatedAsyncioTestCase): @staticmethod def create_method_uri(): return UUri(authority_name="Neelam", ue_id=4, ue_version_major=1, resource_id=3) @@ -48,64 +48,64 @@ def test_constructor_transport_not_instance(self): InMemoryRpcServer("Invalid Transport") self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) - def test_register_request_handler_method_uri_none(self): + async def test_register_request_handler_method_uri_none(self): server = InMemoryRpcServer(MockUTransport()) handler = MagicMock(return_value=UPayload.EMPTY) with self.assertRaises(ValueError) as context: - server.register_request_handler(None, handler) + await server.register_request_handler(None, handler) self.assertEqual(str(context.exception), "Method URI missing") - def test_register_request_handler_handler_none(self): + async def test_register_request_handler_handler_none(self): server = InMemoryRpcServer(MockUTransport()) with self.assertRaises(ValueError) as context: - server.register_request_handler(self.create_method_uri(), None) + await server.register_request_handler(self.create_method_uri(), None) self.assertEqual(str(context.exception), "Request listener missing") - def test_unregister_request_handler_method_uri_none(self): + async def test_unregister_request_handler_method_uri_none(self): server = InMemoryRpcServer(MockUTransport()) handler = MagicMock(return_value=UPayload.EMPTY) with self.assertRaises(ValueError) as context: - server.unregister_request_handler(None, handler) + await server.unregister_request_handler(None, handler) self.assertEqual(str(context.exception), "Method URI missing") - def test_unregister_request_handler_handler_none(self): + async def test_unregister_request_handler_handler_none(self): server = InMemoryRpcServer(MockUTransport()) with self.assertRaises(ValueError) as context: - server.unregister_request_handler(self.create_method_uri(), None) + await server.unregister_request_handler(self.create_method_uri(), None) self.assertEqual(str(context.exception), "Request listener missing") - def test_registering_request_listener(self): + async def test_registering_request_listener(self): handler = MagicMock(return_value=UPayload.EMPTY) method = self.create_method_uri() server = InMemoryRpcServer(MockUTransport()) - self.assertEqual(server.register_request_handler(method, handler).code, UCode.OK) - self.assertEqual(server.unregister_request_handler(method, handler).code, UCode.OK) + self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) + self.assertEqual((await server.unregister_request_handler(method, handler)).code, UCode.OK) - def test_registering_twice_the_same_request_handler(self): + async def test_registering_twice_the_same_request_handler(self): handler = MagicMock(return_value=UPayload.EMPTY) server = InMemoryRpcServer(MockUTransport()) - status = server.register_request_handler(self.create_method_uri(), handler) + status = await server.register_request_handler(self.create_method_uri(), handler) self.assertEqual(status.code, UCode.OK) - status = server.register_request_handler(self.create_method_uri(), handler) + status = await server.register_request_handler(self.create_method_uri(), handler) self.assertEqual(status.code, UCode.ALREADY_EXISTS) - def test_unregistering_non_registered_request_handler(self): + async def test_unregistering_non_registered_request_handler(self): handler = MagicMock(side_effect=NotImplementedError("Unimplemented method 'handleRequest'")) server = InMemoryRpcServer(MockUTransport()) - status = server.unregister_request_handler(self.create_method_uri(), handler) + status = await server.unregister_request_handler(self.create_method_uri(), handler) self.assertEqual(status.code, UCode.NOT_FOUND) - def test_registering_request_listener_with_error_transport(self): + async def test_registering_request_listener_with_error_transport(self): handler = MagicMock(return_value=UPayload.EMPTY) server = InMemoryRpcServer(ErrorUTransport()) - status = server.register_request_handler(self.create_method_uri(), handler) + status = await server.register_request_handler(self.create_method_uri(), handler) self.assertEqual(status.code, UCode.FAILED_PRECONDITION) - def test_handle_requests(self): + async def test_handle_requests(self): class CustomTestUTransport(MockUTransport): - def send(self, message): + async def send(self, message): serialized_uri = UriSerializer().serialize(message.attributes.sink) if serialized_uri in self.listeners: for listener in self.listeners[serialized_uri]: @@ -120,18 +120,18 @@ def send(self, message): # Update the resource_id method2.resource_id = 69 - self.assertEqual(server.register_request_handler(method, handler).code, UCode.OK) + self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) request = UMessageBuilder.request(transport.get_source(), method2, 1000).build() # fake sending a request message that will trigger the handler to be called but since it is # not for the same method as the one registered, it should be ignored and the handler not called - self.assertEqual(transport.send(request).code, UCode.OK) + self.assertEqual((await transport.send(request)).code, UCode.OK) - def test_handle_requests_exception(self): + async def test_handle_requests_exception(self): # test transport that will trigger the handleRequest() class CustomTestUTransport(MockUTransport): - def send(self, message): + async def send(self, message): serialized_uri = UriSerializer().serialize(message.attributes.sink) if serialized_uri in self.listeners: for listener in self.listeners[serialized_uri]: @@ -148,14 +148,14 @@ def handle_request(self, message: UMessage) -> UPayload: server = InMemoryRpcServer(transport) method = self.create_method_uri() - self.assertEqual(server.register_request_handler(method, handler).code, UCode.OK) + self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) request = UMessageBuilder.request(transport.get_source(), method, 1000).build() - self.assertEqual(transport.send(request).code, UCode.OK) + self.assertEqual((await transport.send(request)).code, UCode.OK) - def test_handle_requests_unknown_exception(self): + async def test_handle_requests_unknown_exception(self): class CustomTestUTransport(MockUTransport): - def send(self, message): + async def send(self, message): serialized_uri = UriSerializer().serialize(message.attributes.sink) if serialized_uri in self.listeners: for listener in self.listeners[serialized_uri]: @@ -172,10 +172,10 @@ def handle_request(self, message: UMessage) -> UPayload: server = InMemoryRpcServer(transport) method = self.create_method_uri() - self.assertEqual(server.register_request_handler(method, handler).code, UCode.OK) + self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) request = UMessageBuilder.request(transport.get_source(), method, 1000).build() - self.assertEqual(transport.send(request).code, UCode.OK) + self.assertEqual((await transport.send(request)).code, UCode.OK) if __name__ == '__main__': diff --git a/tests/test_communication/test_inmemorysubscriber.py b/tests/test_communication/test_inmemorysubscriber.py index 76fdf01..6907777 100644 --- a/tests/test_communication/test_inmemorysubscriber.py +++ b/tests/test_communication/test_inmemorysubscriber.py @@ -70,7 +70,7 @@ async def test_unregister_listener(self): subscription_response = await subscriber.subscribe(topic, self.listener, CallOptions()) self.assertFalse(subscription_response is None) - status = subscriber.unregister_listener(topic, self.listener) + status = await subscriber.unregister_listener(topic, self.listener) self.assertEqual(status.code, UCode.OK) async def test_unsubscribe_with_commstatus_error(self): @@ -91,19 +91,19 @@ async def test_unsubscribe_with_exception(self): self.assertEqual(response.message, "Request timed out") self.assertEqual(response.code, UCode.DEADLINE_EXCEEDED) - def test_unregister_listener_missing_topic(self): + async def test_unregister_listener_missing_topic(self): transport = TimeoutUTransport() subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) with self.assertRaises(ValueError) as context: - subscriber.unregister_listener(None, self.listener) + await subscriber.unregister_listener(None, self.listener) self.assertEqual(str(context.exception), "Unsubscribe topic missing") - def test_unregister_listener_missing_listener(self): + async def test_unregister_listener_missing_listener(self): topic = self.create_topic() transport = TimeoutUTransport() subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) with self.assertRaises(ValueError) as context: - subscriber.unregister_listener(topic, None) + await subscriber.unregister_listener(topic, None) self.assertEqual(str(context.exception), "Request listener missing") async def test_unsubscribe_missing_topic(self): diff --git a/tests/test_communication/test_publisher.py b/tests/test_communication/test_publisher.py index d2234e2..87e7ede 100644 --- a/tests/test_communication/test_publisher.py +++ b/tests/test_communication/test_publisher.py @@ -20,8 +20,8 @@ from uprotocol.v1.uri_pb2 import UUri -class TestPublisher(unittest.TestCase): - def test_send_publish(self): +class TestPublisher(unittest.IsolatedAsyncioTestCase): + async def test_send_publish(self): # Topic to publish topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) @@ -32,7 +32,7 @@ def test_send_publish(self): publisher = UClient(transport) # Send the publish message - status = publisher.publish(topic, None) + status = await publisher.publish(topic, None) # Assert that the status code is OK self.assertEqual(status.code, UCode.OK) diff --git a/tests/test_communication/test_simplenotifier.py b/tests/test_communication/test_simplenotifier.py index 70dfb32..a0429fc 100644 --- a/tests/test_communication/test_simplenotifier.py +++ b/tests/test_communication/test_simplenotifier.py @@ -23,55 +23,55 @@ from uprotocol.v1.uri_pb2 import UUri -class TestSimpleNotifier(unittest.TestCase): +class TestSimpleNotifier(unittest.IsolatedAsyncioTestCase): def create_topic(self): return UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) def create_destination_uri(self): return UUri(ue_id=4, ue_version_major=1) - def test_send_notification(self): + async def test_send_notification(self): notifier = SimpleNotifier(MockUTransport()) - status = notifier.notify(self.create_topic(), self.create_destination_uri(), None) + status = await notifier.notify(self.create_topic(), self.create_destination_uri(), None) self.assertEqual(status.code, UCode.OK) - def test_send_notification_with_payload(self): + async def test_send_notification_with_payload(self): uri = UUri(authority_name="Neelam") notifier = SimpleNotifier(MockUTransport()) - status = notifier.notify(self.create_topic(), self.create_destination_uri(), UPayload.pack(uri)) + status = await notifier.notify(self.create_topic(), self.create_destination_uri(), UPayload.pack(uri)) self.assertEqual(status.code, UCode.OK) - def test_register_listener(self): + async def test_register_listener(self): class TestListener(UListener): def on_receive(self, message: UMessage): pass listener = TestListener() notifier = SimpleNotifier(MockUTransport()) - status = notifier.register_notification_listener(self.create_topic(), listener) + status = await notifier.register_notification_listener(self.create_topic(), listener) self.assertEqual(status.code, UCode.OK) - def test_unregister_notification_listener(self): + async def test_unregister_notification_listener(self): class TestListener(UListener): def on_receive(self, message: UMessage): pass listener = TestListener() notifier = SimpleNotifier(MockUTransport()) - status = notifier.register_notification_listener(self.create_topic(), listener) + status = await notifier.register_notification_listener(self.create_topic(), listener) self.assertEqual(status.code, UCode.OK) - status = notifier.unregister_notification_listener(self.create_topic(), listener) + status = await notifier.unregister_notification_listener(self.create_topic(), listener) self.assertEqual(status.code, UCode.OK) - def test_unregister_listener_not_registered(self): + async def test_unregister_listener_not_registered(self): class TestListener(UListener): def on_receive(self, message: UMessage): pass listener = TestListener() notifier = SimpleNotifier(MockUTransport()) - status = notifier.unregister_notification_listener(self.create_topic(), listener) + status = await notifier.unregister_notification_listener(self.create_topic(), listener) self.assertEqual(status.code, UCode.INVALID_ARGUMENT) diff --git a/tests/test_communication/test_simplepublisher.py b/tests/test_communication/test_simplepublisher.py index 53f3093..6762bda 100644 --- a/tests/test_communication/test_simplepublisher.py +++ b/tests/test_communication/test_simplepublisher.py @@ -22,19 +22,19 @@ from uprotocol.v1.uri_pb2 import UUri -class TestSimplePublisher(unittest.TestCase): +class TestSimplePublisher(unittest.IsolatedAsyncioTestCase): def create_topic(self): return UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) - def test_send_publish(self): + async def test_send_publish(self): publisher = SimplePublisher(MockUTransport()) - status = publisher.publish(self.create_topic(), None) + status = await publisher.publish(self.create_topic(), None) self.assertEqual(status.code, UCode.OK) - def test_send_publish_with_stuffed_payload(self): + async def test_send_publish_with_stuffed_payload(self): uri = UUri(authority_name="Neelam") publisher = SimplePublisher(MockUTransport()) - status = publisher.publish(self.create_topic(), UPayload.pack_to_any(uri)) + status = await publisher.publish(self.create_topic(), UPayload.pack_to_any(uri)) self.assertEqual(status.code, UCode.OK) def test_constructor_transport_none(self): @@ -47,12 +47,12 @@ def test_constructor_transport_not_instance(self): SimplePublisher("InvalidTransport") self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) - def test_publish_topic_none(self): + async def test_publish_topic_none(self): publisher = SimplePublisher(MockUTransport()) uri = UUri(authority_name="Neelam") with self.assertRaises(ValueError) as context: - publisher.publish(None, UPayload.pack_to_any(uri)) + await publisher.publish(None, UPayload.pack_to_any(uri)) self.assertEqual(str(context.exception), "Publish topic missing") diff --git a/tests/test_communication/test_subscriber.py b/tests/test_communication/test_subscriber.py index 66ecb94..4d92c59 100644 --- a/tests/test_communication/test_subscriber.py +++ b/tests/test_communication/test_subscriber.py @@ -60,7 +60,7 @@ async def test_publish_notify_subscribe_listener(self): # Create a mock for MyListener's on_receive method self.listener.on_receive = MagicMock(side_effect=self.listener.on_receive) - status = upclient.publish(topic, None) + status = await upclient.publish(topic, None) self.assertEqual(status.code, UCode.OK) # Wait for a short time to ensure on_receive can be called await asyncio.sleep(1) diff --git a/tests/test_communication/test_uclient.py b/tests/test_communication/test_uclient.py index 1490ed2..4512308 100644 --- a/tests/test_communication/test_uclient.py +++ b/tests/test_communication/test_uclient.py @@ -16,7 +16,7 @@ import unittest from unittest.mock import MagicMock, create_autospec -from tests.test_communication.mock_utransport import EchoUTransport, ErrorUTransport, MockUTransport, TimeoutUTransport +from tests.test_communication.mock_utransport import EchoUTransport, MockUTransport, TimeoutUTransport from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.requesthandler import RequestHandler from uprotocol.communication.uclient import UClient @@ -50,51 +50,47 @@ def test_create_upclient_with_null_transport(self): with self.assertRaises(ValueError): UClient(None) - def test_create_upclient_with_error_transport(self): - with self.assertRaises(UStatusError): - UClient(ErrorUTransport()) - - def test_send_notification(self): - status = UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), None) + async def test_send_notification(self): + status = await UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), None) self.assertEqual(status.code, UCode.OK) - def test_send_notification_with_payload(self): + async def test_send_notification_with_payload(self): uri = UUri(authority_name="neelam") - status = UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), UPayload.pack(uri)) + status = await UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), UPayload.pack(uri)) self.assertEqual(status.code, UCode.OK) - def test_register_listener(self): + async def test_register_listener(self): listener = create_autospec(UListener, instance=True) listener.on_receive = MagicMock() - status = UClient(MockUTransport()).register_notification_listener(create_topic(), listener) + status = await UClient(MockUTransport()).register_notification_listener(create_topic(), listener) self.assertEqual(status.code, UCode.OK) - def test_unregister_notification_listener(self): + async def test_unregister_notification_listener(self): listener = create_autospec(UListener, instance=True) listener.on_receive = MagicMock() notifier = UClient(MockUTransport()) - status = notifier.register_notification_listener(create_topic(), listener) + status = await notifier.register_notification_listener(create_topic(), listener) self.assertEqual(status.code, UCode.OK) - status = notifier.unregister_notification_listener(create_topic(), listener) + status = await notifier.unregister_notification_listener(create_topic(), listener) self.assertEqual(status.code, UCode.OK) - def test_unregister_listener_not_registered(self): + async def test_unregister_listener_not_registered(self): listener = create_autospec(UListener, instance=True) listener.on_receive = MagicMock() - status = UClient(MockUTransport()).unregister_notification_listener(create_topic(), listener) + status = await UClient(MockUTransport()).unregister_notification_listener(create_topic(), listener) self.assertEqual(status.code, UCode.INVALID_ARGUMENT) - def test_send_publish(self): - status = UClient(MockUTransport()).publish(create_topic(), None) + async def test_send_publish(self): + status = await UClient(MockUTransport()).publish(create_topic(), None) self.assertEqual(status.code, UCode.OK) - def test_send_publish_with_stuffed_payload(self): + async def test_send_publish_with_stuffed_payload(self): uri = UUri(authority_name="neelam") - status = UClient(MockUTransport()).publish(create_topic(), UPayload.pack_to_any(uri)) + status = await UClient(MockUTransport()).publish(create_topic(), UPayload.pack_to_any(uri)) self.assertEqual(status.code, UCode.OK) async def test_invoke_method_with_payload(self): @@ -170,36 +166,36 @@ async def test_unregister_listener(self): subscriber = UClient(HappySubscribeUTransport()) subscription_response = await subscriber.subscribe(topic, my_listener, CallOptions.DEFAULT) self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) - status = subscriber.unregister_listener(topic, my_listener) + status = await subscriber.unregister_listener(topic, my_listener) self.assertEqual(status.code, UCode.OK) - def test_registering_request_listener(self): + async def test_registering_request_listener(self): handler = create_autospec(RequestHandler, instance=True) server = UClient(MockUTransport()) - status = server.register_request_handler(create_method_uri(), handler) + status = await server.register_request_handler(create_method_uri(), handler) self.assertEqual(status.code, UCode.OK) - def test_registering_twice_the_same_request_handler(self): + async def test_registering_twice_the_same_request_handler(self): handler = create_autospec(RequestHandler, instance=True) server = UClient(MockUTransport()) - status = server.register_request_handler(create_method_uri(), handler) + status = await server.register_request_handler(create_method_uri(), handler) self.assertEqual(status.code, UCode.OK) - status = server.register_request_handler(create_method_uri(), handler) + status = await server.register_request_handler(create_method_uri(), handler) self.assertEqual(status.code, UCode.ALREADY_EXISTS) - def test_unregistering_non_registered_request_handler(self): + async def test_unregistering_non_registered_request_handler(self): handler = create_autospec(RequestHandler, instance=True) server = UClient(MockUTransport()) - status = server.unregister_request_handler(create_method_uri(), handler) + status = await server.unregister_request_handler(create_method_uri(), handler) self.assertEqual(status.code, UCode.NOT_FOUND) - def test_request_handler_for_notification(self): + async def test_request_handler_for_notification(self): transport = EchoUTransport() client = UClient(transport) handler = create_autospec(RequestHandler, instance=True) - client.register_request_handler(create_method_uri(), handler) - self.assertEqual(client.notify(create_topic(), transport.get_source(), None), UStatus(code=UCode.OK)) + await client.register_request_handler(create_method_uri(), handler) + self.assertEqual(await client.notify(create_topic(), transport.get_source(), None), UStatus(code=UCode.OK)) def create_topic(): diff --git a/tests/test_transport/test_utransport.py b/tests/test_transport/test_utransport.py index e75e623..4615d34 100644 --- a/tests/test_transport/test_utransport.py +++ b/tests/test_transport/test_utransport.py @@ -29,14 +29,17 @@ def on_receive(self, message): class HappyUTransport(UTransport): - def send(self, message): + async def close(self) -> None: + pass + + async def send(self, message): return UStatus(code=UCode.INVALID_ARGUMENT if message is None else UCode.OK) - def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: listener.on_receive(UMessage()) return UStatus(code=UCode.OK) - def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): + async def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): return UStatus(code=UCode.OK) def get_source(self): @@ -44,14 +47,17 @@ def get_source(self): class SadUTransport(UTransport): - def send(self, message): + async def close(self) -> None: + pass + + async def send(self, message): return UStatus(code=UCode.INTERNAL) - def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: listener.on_receive(None) return UStatus(code=UCode.INTERNAL) - def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): + async def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): return UStatus(code=UCode.INTERNAL) def get_source(self): @@ -59,39 +65,39 @@ def get_source(self): class UTransportTest(unittest.IsolatedAsyncioTestCase): - def test_happy_send_message_parts(self): + async def test_happy_send_message_parts(self): transport = HappyUTransport() - status = transport.send(UMessage()) + status = await transport.send(UMessage()) self.assertEqual(status.code, UCode.OK) - def test_happy_register_listener(self): + async def test_happy_register_listener(self): transport = HappyUTransport() - status = transport.register_listener(UUri(), MyListener(), None) + status = await transport.register_listener(UUri(), MyListener(), None) self.assertEqual(status.code, UCode.OK) - def test_happy_register_unlistener(self): + async def test_happy_register_unlistener(self): transport = HappyUTransport() - status = transport.unregister_listener(UUri(), MyListener(), None) + status = await transport.unregister_listener(UUri(), MyListener(), None) self.assertEqual(status.code, UCode.OK) - def test_sending_null_message(self): + async def test_sending_null_message(self): transport = HappyUTransport() - status = transport.send(None) + status = await transport.send(None) self.assertEqual(status.code, UCode.INVALID_ARGUMENT) - def test_unhappy_send_message_parts(self): + async def test_unhappy_send_message_parts(self): transport = SadUTransport() - status = transport.send(UMessage()) + status = await transport.send(UMessage()) self.assertEqual(status.code, UCode.INTERNAL) - def test_unhappy_register_listener(self): + async def test_unhappy_register_listener(self): transport = SadUTransport() - status = transport.register_listener(UUri(), MyListener(), None) + status = await transport.register_listener(UUri(), MyListener(), None) self.assertEqual(status.code, UCode.INTERNAL) - def test_unhappy_register_unlistener(self): + async def test_unhappy_register_unlistener(self): transport = SadUTransport() - status = transport.unregister_listener(UUri(), MyListener(), None) + status = await transport.unregister_listener(UUri(), MyListener(), None) self.assertEqual(status.code, UCode.INTERNAL) diff --git a/tests/test_transport/test_validator/test_uattributesvalidator.py b/tests/test_transport/test_validator/test_uattributesvalidator.py index 19bbe77..c23348b 100644 --- a/tests/test_transport/test_validator/test_uattributesvalidator.py +++ b/tests/test_transport/test_validator/test_uattributesvalidator.py @@ -323,3 +323,14 @@ def test_uattribute_validator_validate_reqid_invalid(self): self.assertTrue(result.is_failure()) self.assertEqual(str(validator), "UAttributesValidator.Response") self.assertEqual(result.message, "Invalid correlation UUID") + + def test_validate_priority_is_cs0(self): + message = UMessageBuilder.publish(build_default_uuri()).build() + message.attributes.priority = UPriority.UPRIORITY_CS0 + + validator = UAttributesValidator.get_validator(message.attributes) + result = validator.validate(message.attributes) + + self.assertTrue(result.is_failure()) + self.assertEqual(str(validator), "UAttributesValidator.Publish") + self.assertEqual(result.get_message(), "Invalid UPriority [UPRIORITY_CS0]") diff --git a/uprotocol/communication/inmemoryrpcclient.py b/uprotocol/communication/inmemoryrpcclient.py index eba8cd2..e92b7af 100644 --- a/uprotocol/communication/inmemoryrpcclient.py +++ b/uprotocol/communication/inmemoryrpcclient.py @@ -86,10 +86,7 @@ def __init__(self, transport: UTransport): self.transport = transport self.requests: Dict[str, asyncio.Future] = {} self.response_handler: UListener = HandleResponsesListener(self.requests) - - status = self.transport.register_listener(UriFactory.ANY, self.response_handler, None) - if status.code != UCode.OK: - raise UStatusError.from_code_message(status.code, "Failed to register listener") + self.is_listener_registered = False def cleanup_request(self, request_id): request_id = UuidSerializer.serialize(request_id) @@ -101,6 +98,9 @@ async def invoke_method( ) -> UPayload: """ Invoke a method (send an RPC request) and receive the response asynchronously. + Ensures that the listener is registered before proceeding with the method invocation. + If the listener is not registered, it attempts to register it and raises an exception + if the registration fails. :param method_uri: The method URI to be invoked. :param request_payload: The request message to be sent to the server. @@ -108,6 +108,12 @@ async def invoke_method( :return: Returns the asyncio Future with the response payload or raises an exception with the failure reason as UStatus. """ + if not self.is_listener_registered: + # Ensure listener is registered before proceeding + status = await self.transport.register_listener(UriFactory.ANY, self.response_handler, None) + if status.code != UCode.OK: + raise UStatusError.from_code_message(status.code, "Failed to register listener for rpc client") + self.is_listener_registered = True options = options or CallOptions.DEFAULT builder = UMessageBuilder.request(self.transport.get_source(), method_uri, options.timeout) request = None @@ -140,7 +146,7 @@ async def wait_for_response(): # Start the task for waiting for the response before sending the request response_task = asyncio.create_task(wait_for_response()) - status = self.transport.send(request) + status = await self.transport.send(request) if status.code != UCode.OK: raise UStatusError(status) @@ -155,4 +161,6 @@ def close(self): Close the InMemoryRpcClient by clearing stored requests and unregistering the listener. """ self.requests.clear() - self.transport.unregister_listener(UriFactory.ANY, self.response_handler, self.transport.get_source()) + asyncio.ensure_future( + self.transport.unregister_listener(UriFactory.ANY, self.response_handler, self.transport.get_source()) + ) diff --git a/uprotocol/communication/inmemoryrpcserver.py b/uprotocol/communication/inmemoryrpcserver.py index bf36296..8e43366 100644 --- a/uprotocol/communication/inmemoryrpcserver.py +++ b/uprotocol/communication/inmemoryrpcserver.py @@ -75,7 +75,7 @@ def __init__(self, transport): self.request_handlers = {} self.request_handler = HandleRequestListener(self.transport, self.request_handlers) - def register_request_handler(self, method_uri: UUri, handler: RequestHandler) -> UStatus: + async def register_request_handler(self, method_uri: UUri, handler: RequestHandler) -> UStatus: """ Register a handler that will be invoked when requests come in from clients for the given method. @@ -97,7 +97,7 @@ def register_request_handler(self, method_uri: UUri, handler: RequestHandler) -> if current_handler is not None: raise UStatusError.from_code_message(UCode.ALREADY_EXISTS, "Handler already registered") - result = self.transport.register_listener(UriFactory.ANY, self.request_handler, method_uri) + result = await self.transport.register_listener(UriFactory.ANY, self.request_handler, method_uri) if result.code != UCode.OK: raise UStatusError.from_code_message(result.code, result.message) @@ -109,7 +109,7 @@ def register_request_handler(self, method_uri: UUri, handler: RequestHandler) -> except Exception as e: return UStatus(code=UCode.INTERNAL, message=str(e)) - def unregister_request_handler(self, method_uri: UUri, handler: RequestHandler) -> UStatus: + async def unregister_request_handler(self, method_uri: UUri, handler: RequestHandler) -> UStatus: """ Unregister a handler that will be invoked when requests come in from clients for the given method. @@ -125,6 +125,6 @@ def unregister_request_handler(self, method_uri: UUri, handler: RequestHandler) if self.request_handlers.get(method_uri_str) == handler: del self.request_handlers[method_uri_str] - return self.transport.unregister_listener(UriFactory.ANY, self.request_handler, method_uri) + return await self.transport.unregister_listener(UriFactory.ANY, self.request_handler, method_uri) return UStatus(code=UCode.NOT_FOUND) diff --git a/uprotocol/communication/inmemorysubscriber.py b/uprotocol/communication/inmemorysubscriber.py index 0f68c08..e7b3ffd 100644 --- a/uprotocol/communication/inmemorysubscriber.py +++ b/uprotocol/communication/inmemorysubscriber.py @@ -96,7 +96,7 @@ async def subscribe(self, topic: UUri, listener: UListener, options: CallOptions response_future = RpcMapper.map_response(future_result, SubscriptionResponse) response = await response_future - self.transport.register_listener(topic, listener) + await self.transport.register_listener(topic, listener) return response async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> UStatus: @@ -124,11 +124,11 @@ async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptio response_future = RpcMapper.map_response_to_result(future_result, UnsubscribeResponse) response = await response_future if response.is_success(): - self.transport.unregister_listener(topic, listener) + await self.transport.unregister_listener(topic, listener) return UStatus(code=UCode.OK) return response.failure_value() - def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: + async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: """ Unregister a listener from a topic. @@ -143,4 +143,4 @@ def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: raise ValueError("Unsubscribe topic missing") if not listener: raise ValueError("Request listener missing") - return self.transport.unregister_listener(topic, listener) + return await self.transport.unregister_listener(topic, listener) diff --git a/uprotocol/communication/notifier.py b/uprotocol/communication/notifier.py index f4c2921..96939bd 100644 --- a/uprotocol/communication/notifier.py +++ b/uprotocol/communication/notifier.py @@ -29,7 +29,7 @@ class Notifier(ABC): """ @abstractmethod - def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus: + async def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus: """ Send a notification to a given topic passing a payload. @@ -41,7 +41,7 @@ def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus: pass @abstractmethod - def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + async def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: """ Register a listener for a notification topic. @@ -52,7 +52,7 @@ def register_notification_listener(self, topic: UUri, listener: UListener) -> US pass @abstractmethod - def unregister_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + async def unregister_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: """ Unregister a listener from a notification topic. diff --git a/uprotocol/communication/publisher.py b/uprotocol/communication/publisher.py index e583367..f708ee8 100644 --- a/uprotocol/communication/publisher.py +++ b/uprotocol/communication/publisher.py @@ -30,12 +30,12 @@ class Publisher(ABC): """ @abstractmethod - def publish(self, topic: UUri, payload: UPayload) -> UStatus: + async def publish(self, topic: UUri, payload: UPayload) -> UStatus: """ Publish a message to a topic passing UPayload as the payload. :param topic: The topic to publish to. :param payload: The UPayload to publish. - :return: UStatus + :return: An instance of UStatus indicating the status of the publish operation. """ pass diff --git a/uprotocol/communication/rpcserver.py b/uprotocol/communication/rpcserver.py index b50f1d9..5284014 100644 --- a/uprotocol/communication/rpcserver.py +++ b/uprotocol/communication/rpcserver.py @@ -28,7 +28,7 @@ class RpcServer(ABC): """ @abstractmethod - def register_request_handler(self, method: UUri, handler: RequestHandler) -> UStatus: + async def register_request_handler(self, method: UUri, handler: RequestHandler) -> UStatus: """ Register a handler that will be invoked when requests come in from clients for the given method. @@ -41,7 +41,7 @@ def register_request_handler(self, method: UUri, handler: RequestHandler) -> USt pass @abstractmethod - def unregister_request_handler(self, method: UUri, handler: RequestHandler) -> UStatus: + async def unregister_request_handler(self, method: UUri, handler: RequestHandler) -> UStatus: """ Unregister a handler that will be invoked when requests come in from clients for the given method. diff --git a/uprotocol/communication/simplenotifier.py b/uprotocol/communication/simplenotifier.py index 0add6f5..01a3775 100644 --- a/uprotocol/communication/simplenotifier.py +++ b/uprotocol/communication/simplenotifier.py @@ -45,7 +45,7 @@ def __init__(self, transport: UTransport): raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) self.transport = transport - def notify(self, topic: UUri, destination: UUri, payload: Optional[UPayload] = None) -> UStatus: + async def notify(self, topic: UUri, destination: UUri, payload: Optional[UPayload] = None) -> UStatus: """ Send a notification to a given topic. @@ -55,9 +55,9 @@ def notify(self, topic: UUri, destination: UUri, payload: Optional[UPayload] = N :return: Returns the UStatus with the status of the notification. """ builder = UMessageBuilder.notification(topic, destination) - return self.transport.send(builder.build() if payload is None else builder.build_from_upayload(payload)) + return await self.transport.send(builder.build() if payload is None else builder.build_from_upayload(payload)) - def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + async def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: """ Register a listener for a notification topic. @@ -65,9 +65,9 @@ def register_notification_listener(self, topic: UUri, listener: UListener) -> US :param listener: The listener to be called when a message is received on the topic. :return: Returns the UStatus with the status of the listener registration. """ - return self.transport.register_listener(topic, listener, self.transport.get_source()) + return await self.transport.register_listener(topic, listener, self.transport.get_source()) - def unregister_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + async def unregister_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: """ Unregister a listener from a notification topic. @@ -75,4 +75,4 @@ def unregister_notification_listener(self, topic: UUri, listener: UListener) -> :param listener: The listener to be unregistered from the topic. :return: Returns the UStatus with the status of the listener that was unregistered. """ - return self.transport.unregister_listener(topic, listener, self.transport.get_source()) + return await self.transport.unregister_listener(topic, listener, self.transport.get_source()) diff --git a/uprotocol/communication/simplepublisher.py b/uprotocol/communication/simplepublisher.py index 01d63ee..1681500 100644 --- a/uprotocol/communication/simplepublisher.py +++ b/uprotocol/communication/simplepublisher.py @@ -33,7 +33,7 @@ def __init__(self, transport: UTransport): raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) self.transport = transport - def publish(self, topic: UUri, payload: UPayload) -> UStatus: + async def publish(self, topic: UUri, payload: UPayload) -> UStatus: """ Publishes a message to a topic using the provided payload. @@ -45,4 +45,4 @@ def publish(self, topic: UUri, payload: UPayload) -> UStatus: raise ValueError("Publish topic missing") message = UMessageBuilder.publish(topic).build_from_upayload(payload) - return self.transport.send(message) + return await self.transport.send(message) diff --git a/uprotocol/communication/uclient.py b/uprotocol/communication/uclient.py index ac20926..586ca2b 100644 --- a/uprotocol/communication/uclient.py +++ b/uprotocol/communication/uclient.py @@ -36,6 +36,22 @@ class UClient(RpcServer, Subscriber, Notifier, Publisher, RpcClient): + """ + UClient provides a unified interface for various communication patterns over a UTransport instance, + including RPC, subscriptions, notifications, and message publishing. It combines functionalities + from RpcServer, Subscriber, Notifier, Publisher, and RpcClient, allowing for comprehensive and + asynchronous operations such as subscribing to topics, publishing messages, sending notifications, + and invoking remote methods. + + Attributes: + transport (UTransport): The underlying transport mechanism. + rpcServer (InMemoryRpcServer): Handles incoming RPC requests. + publisher (SimplePublisher): Sends messages to topics. + notifier (SimpleNotifier): Sends notifications to destinations. + rpcClient (InMemoryRpcClient): Invokes remote methods. + subscriber (InMemorySubscriber): Manages topic subscriptions. + """ + def __init__(self, transport: UTransport): self.transport = transport if transport is None: @@ -50,33 +66,124 @@ def __init__(self, transport: UTransport): self.subscriber = InMemorySubscriber(self.transport, self.rpcClient) async def subscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> SubscriptionResponse: + """ + Subscribe to a given topic. + + The API will return a future with the response SubscriptionResponse or exception + with the failure if the subscription was not successful. The API will also register the listener to be + called when messages are received. + + :param topic: The topic to subscribe to. + :param listener: The listener to be called when a message is received on the topic. + :param options: The call options for the subscription. + :return: Returns the future with the response SubscriptionResponse or + exception with the failure reason as UStatus. + """ return await self.subscriber.subscribe(topic, listener, options) - def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> UStatus: - return self.subscriber.unsubscribe(topic, listener, options) - - def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: - return self.subscriber.unregister_listener(topic, listener) - - def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus: - return self.notifier.notify(topic, destination, payload) - - def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: - return self.notifier.register_notification_listener(topic, listener) - - def unregister_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: - return self.notifier.unregister_notification_listener(topic, listener) - - def publish(self, topic: UUri, payload: UPayload) -> UStatus: - return self.publisher.publish(topic, payload) - - def register_request_handler(self, method: UUri, handler): - return self.rpcServer.register_request_handler(method, handler) - - def unregister_request_handler(self, method: UUri, handler): - return self.rpcServer.unregister_request_handler(method, handler) + async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> UStatus: + """ + Unsubscribe to a given topic. + + The subscriber no longer wishes to be subscribed to said topic so we issue an unsubscribe + request to the USubscription service. + + :param topic: The topic to unsubscribe to. + :param listener: The listener to be called when a message is received on the topic. + :param options: The call options for the subscription. + :return: Returns UStatus with the result from the unsubscribe request. + """ + return await self.subscriber.unsubscribe(topic, listener, options) + + async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Unregister a listener from a topic. + + This method will only unregister the listener for a given subscription thus allowing a uE to stay + subscribed even if the listener is removed. + + :param topic: The topic to subscribe to. + :param listener: The listener to be called when a message is received on the topic. + :return: Returns UStatus with the status of the listener unregister request. + """ + return await self.subscriber.unregister_listener(topic, listener) + + async def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus: + """ + Send a notification to a given topic. + + :param topic: The topic to send the notification to. + :param destination: The destination to send the notification to. + :param payload: The payload to send with the notification. + :return: Returns the UStatus with the status of the notification. + """ + return await self.notifier.notify(topic, destination, payload) + + async def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Register a listener for a notification topic. + + :param topic: The topic to register the listener to. + :param listener: The listener to be called when a message is received on the topic. + :return: Returns the UStatus with the status of the listener registration. + """ + return await self.notifier.register_notification_listener(topic, listener) + + async def unregister_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Unregister a listener from a notification topic. + + :param topic: The topic to unregister the listener from. + :param listener: The listener to be unregistered from the topic. + :return: Returns the UStatus with the status of the listener that was unregistered. + """ + return await self.notifier.unregister_notification_listener(topic, listener) + + async def publish(self, topic: UUri, payload: UPayload) -> UStatus: + """ + Publishes a message to a topic using the provided payload. + + :param topic: The topic to publish the message to. + :param payload: The payload to be published. + :return: An instance of UStatus indicating the status of the publish operation. + """ + return await self.publisher.publish(topic, payload) + + async def register_request_handler(self, method: UUri, handler): + """ + Register a handler that will be invoked when requests come in from clients for the given method. + + Note: Only one handler is allowed to be registered per method URI. + + :param method_uri: The URI for the method to register the listener for. + :param handler: The handler that will process the request for the client. + :return: Returns the status of registering the RpcListener. + """ + return await self.rpcServer.register_request_handler(method, handler) + + async def unregister_request_handler(self, method: UUri, handler): + """ + Unregister a handler that will be invoked when requests come in from clients for the given method. + + :param method_uri: The resolved UUri where the listener was registered to receive messages from. + :param handler: The handler for processing requests. + :return: Returns the status of unregistering the RpcListener. + """ + return await self.rpcServer.unregister_request_handler(method, handler) async def invoke_method( self, method_uri: UUri, request_payload: UPayload, options: Optional[CallOptions] = None ) -> UPayload: + """ + Invoke a method (send an RPC request) and receive the response asynchronously. + Ensures that the listener is registered before proceeding with the method invocation. + If the listener is not registered, it attempts to register it and raises an exception + if the registration fails. + + :param method_uri: The method URI to be invoked. + :param request_payload: The request message to be sent to the server. + :param options: RPC method invocation call options. Defaults to None. + :return: Returns the asyncio Future with the response payload or raises an exception + with the failure reason as UStatus. + """ return await self.rpcClient.invoke_method(method_uri, request_payload, options) diff --git a/uprotocol/transport/utransport.py b/uprotocol/transport/utransport.py index 8526d1e..85b936f 100644 --- a/uprotocol/transport/utransport.py +++ b/uprotocol/transport/utransport.py @@ -24,14 +24,14 @@ class UTransport(ABC): """UTransport is the uP-L1 interface that provides a common API for uE developers to send and receive messages.
UTransport implementations contain the details for connecting to the underlying transport technology and sending UMessage using the configured technology.
For more information please refer to - link + https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/up-l1/README.adoc """ TRANSPORT_NULL_ERROR = "Transport cannot be null" TRANSPORT_NOT_INSTANCE_ERROR = "Transport must be an instance of UTransport" @abstractmethod - def send(self, message: UMessage) -> UStatus: + async def send(self, message: UMessage) -> UStatus: """Send a message (in parts) over the transport. @param message the UMessage to be sent. @return Returns UStatus with UCode set to the status code (successful or failure). @@ -39,7 +39,7 @@ def send(self, message: UMessage) -> UStatus: pass @abstractmethod - def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: """Register UListener for UUri source and sink filters to be called when a message is received. @@ -56,7 +56,7 @@ def register_listener(self, source_filter: UUri, listener: UListener, sink_filte pass @abstractmethod - def unregister_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def unregister_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: """Unregister UListener for UUri source and sink filters. Messages arriving at this topic will no longer be processed by this listener. @@ -73,8 +73,16 @@ def unregister_listener(self, source_filter: UUri, listener: UListener, sink_fil @abstractmethod def get_source(self) -> UUri: - """Get the source URI of the transport. + """Get the source URI of the transport.This URI represents the uE that is using the transport. @return Returns the source URI of the transport. """ pass + + @abstractmethod + async def close(self) -> None: + """ + Close the connection to the transport that will trigger any registered listeners + to be unregistered. + """ + pass diff --git a/uprotocol/transport/validator/uattributesvalidator.py b/uprotocol/transport/validator/uattributesvalidator.py index c59e855..241f799 100644 --- a/uprotocol/transport/validator/uattributesvalidator.py +++ b/uprotocol/transport/validator/uattributesvalidator.py @@ -145,7 +145,7 @@ def validate_priority(attr: UAttributes): """ return ( ValidationResult.success() - if attr.priority >= UPriority.UPRIORITY_CS0 + if attr.priority >= UPriority.UPRIORITY_CS1 else ValidationResult.failure(f"Invalid UPriority [{UPriority.Name(attr.priority)}]") )