From 9ceedfa2e006a16e0270465fd0781461a5a283f2 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Thu, 18 Jul 2024 10:57:38 -0400 Subject: [PATCH 01/10] Move InMemorySubscriber from L2 to L3 client module --- tests/test_client/__init__.py | 0 .../test_usubscription/__init__.py | 0 .../test_usubscription/test_v3/__init__.py | 0 .../test_inmemoryusubcriptionclient.py} | 187 ++++++++++++++---- tests/test_communication/test_uclient.py | 47 ----- uprotocol/client/__init__.py | 0 uprotocol/client/usubscription/__init__.py | 0 uprotocol/client/usubscription/v3/__init__.py | 0 .../v3/inmemoryusubcriptionclient.py} | 172 +++++++++++++--- .../v3}/subscriptionchangehandler.py | 0 .../usubscription/v3/usubscriptionclient.py | 177 +++++++++++++++++ uprotocol/communication/subscriber.py | 89 --------- uprotocol/communication/uclient.py | 63 +----- uprotocol/transport/utransport.py | 9 +- 14 files changed, 487 insertions(+), 257 deletions(-) create mode 100644 tests/test_client/__init__.py create mode 100644 tests/test_client/test_usubscription/__init__.py create mode 100644 tests/test_client/test_usubscription/test_v3/__init__.py rename tests/{test_communication/test_inmemorysubscriber.py => test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py} (66%) create mode 100644 uprotocol/client/__init__.py create mode 100644 uprotocol/client/usubscription/__init__.py create mode 100644 uprotocol/client/usubscription/v3/__init__.py rename uprotocol/{communication/inmemorysubscriber.py => client/usubscription/v3/inmemoryusubcriptionclient.py} (55%) rename uprotocol/{communication => client/usubscription/v3}/subscriptionchangehandler.py (100%) create mode 100644 uprotocol/client/usubscription/v3/usubscriptionclient.py delete mode 100644 uprotocol/communication/subscriber.py diff --git a/tests/test_client/__init__.py b/tests/test_client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_client/test_usubscription/__init__.py b/tests/test_client/test_usubscription/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_client/test_usubscription/test_v3/__init__.py b/tests/test_client/test_usubscription/test_v3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_communication/test_inmemorysubscriber.py b/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py similarity index 66% rename from tests/test_communication/test_inmemorysubscriber.py rename to tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py index edbe1b4..3181230 100644 --- a/tests/test_communication/test_inmemorysubscriber.py +++ b/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py @@ -16,11 +16,12 @@ import unittest from unittest.mock import AsyncMock, MagicMock +from tests.test_communication.mock_utransport import MockUTransport +from uprotocol.client.usubscription.v3.inmemoryusubcriptionclient import InMemoryUSubscriptionClient +from uprotocol.client.usubscription.v3.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient -from uprotocol.communication.inmemorysubscriber import InMemorySubscriber from uprotocol.communication.simplenotifier import SimpleNotifier -from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.communication.upayload import UPayload from uprotocol.communication.ustatuserror import UStatusError from uprotocol.core.usubscription.v3.usubscription_pb2 import ( @@ -43,7 +44,7 @@ def on_receive(self, umsg: UMessage) -> None: pass -class TestInMemorySubscriber(unittest.IsolatedAsyncioTestCase): +class TestInMemoryUSubscriptionClient(unittest.IsolatedAsyncioTestCase): def setUp(self): self.transport = MagicMock(spec=UTransport) self.rpc_client = MagicMock(spec=InMemoryRpcClient) @@ -66,7 +67,7 @@ async def test_simple_mock_of_rpc_client_and_notifier(self): self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) result = await subscriber.subscribe(self.topic, self.listener) @@ -89,10 +90,10 @@ async def test_simple_mock_of_rpc_client_and_notifier_returned_subscribe_pending self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) result = await subscriber.subscribe(self.topic, self.listener) @@ -115,10 +116,10 @@ async def test_simple_mock_of_rpc_client_and_notifier_returned_unsubscribed(self self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) result = await subscriber.subscribe(self.topic, self.listener) @@ -137,7 +138,7 @@ async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokeme self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) with self.assertRaises(Exception) as context: @@ -157,7 +158,7 @@ async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokeme self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) with self.assertRaises(UStatusError) as context: @@ -180,7 +181,7 @@ async def test_subscribe_when_we_pass_a_subscription_change_notification_handler self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) handler = MagicMock(spec=SubscriptionChangeHandler) @@ -206,7 +207,7 @@ async def test_subscribe_when_we_try_to_subscribe_to_the_same_topic_twice_with_s self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) handler = MagicMock(spec=SubscriptionChangeHandler) @@ -234,7 +235,7 @@ async def test_subscribe_when_we_try_to_subscribe_to_the_same_topic_twice_with_d self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) handler = MagicMock(spec=SubscriptionChangeHandler) @@ -261,7 +262,7 @@ async def test_unsubscribe_using_mock_rpcclient_and_simplernotifier(self): self.notifier.unregister_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) response = await subscriber.unsubscribe(self.topic, self.listener) self.assertEqual(response.message, "") @@ -278,7 +279,7 @@ async def test_unsubscribe_when_invokemethod_return_an_exception(self): UCode.CANCELLED, "Operation cancelled" ) self.notifier.unregister_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) response = await subscriber.unsubscribe(self.topic, self.listener) self.assertEqual(response.message, "Operation cancelled") @@ -295,7 +296,7 @@ async def test_unsubscribe_when_invokemethod_returned_ok_but_we_failed_to_unregi SubscriptionResponse(status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBE_PENDING)) ) self.notifier.unregister_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) response = await subscriber.unsubscribe(self.topic, self.listener) self.assertEqual(response.status.code, UCode.ABORTED) @@ -322,7 +323,7 @@ async def register_notification_listener(uri, listener): return UStatus(code=UCode.OK) self.notifier.register_notification_listener = AsyncMock(side_effect=register_notification_listener) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) result = await subscriber.subscribe(self.topic, self.listener, CallOptions.DEFAULT) self.assertEqual(result.status.state, SubscriptionStatus.State.SUBSCRIBE_PENDING) @@ -338,65 +339,185 @@ async def register_notification_listener(uri, listener): async def test_unregister_listener_missing_topic(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unregister_listener(None, self.listener) self.assertEqual(str(context.exception), "Unsubscribe topic missing") async def test_unregister_listener_missing_listener(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unregister_listener(self.topic, None) self.assertEqual(str(context.exception), "Request listener missing") async def test_unsubscribe_missing_topic(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unsubscribe(None, self.listener, CallOptions()) self.assertEqual(str(context.exception), "Unsubscribe topic missing") async def test_unsubscribe_missing_listener(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unsubscribe(self.topic, None, CallOptions()) self.assertEqual(str(context.exception), "Listener missing") async def test_subscribe_missing_topic(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.subscribe(None, self.listener, CallOptions()) self.assertEqual(str(context.exception), "Subscribe topic missing") async def test_subscribe_missing_listener(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.subscribe(self.topic, None, CallOptions()) self.assertEqual(str(context.exception), "Request listener missing") - def test_subscriber_constructor_transport_none(self): + def test_subscription_client_constructor_transport_none(self): with self.assertRaises(ValueError) as context: - InMemorySubscriber(None, None, None) + InMemoryUSubscriptionClient(None, None, None) self.assertEqual(str(context.exception), UTransport.TRANSPORT_NULL_ERROR) - def test_subscriber_constructor_transport_not_instance(self): + def test_subscription_client_constructor_transport_not_instance(self): with self.assertRaises(ValueError) as context: - InMemorySubscriber("InvalidTransport", None, None) + InMemoryUSubscriptionClient("InvalidTransport", None, None) self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) - def test_subscriber_constructor_rpcclient_none(self): + def test_subscription_client_constructor_with_just_transport(self): + client = InMemoryUSubscriptionClient(self.transport) + self.assertTrue(client is not None) + + async def test_register_notification_api_when_passed_a_null_topic(self): + # Setup mocks + notifier = AsyncMock() + notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + # Initialize InMemoryUSubscriptionClient + transport = MagicMock(spec=UTransport) + subscriber = InMemoryUSubscriptionClient(transport) + assert subscriber is not None + + # Define the handler + class MySubscriptionChangeHandler(SubscriptionChangeHandler): + def handle_subscription_change(self, topic, status): + raise NotImplementedError("Unimplemented method 'handleSubscriptionChange'") + + handler = MySubscriptionChangeHandler() + + # Assert that passing a null topic raises a ValueError with self.assertRaises(ValueError) as context: - InMemorySubscriber(self.transport, None, None) - self.assertEqual(str(context.exception), "RpcClient missing") + await subscriber.register_for_notifications(None, handler) + self.assertEqual(str(context.exception), "Topic missing") + + # Verify the notifier interaction + notifier.register_notification_listener.assert_not_called() - def test_subscriber_constructor_notiifier_none(self): + async def test_register_notification_api_when_passed_a_null_handler(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + assert subscriber is not None + + # Assert that passing a null handler raises a ValueError with self.assertRaises(ValueError) as context: - InMemorySubscriber(self.transport, self.rpc_client, None) - self.assertEqual(str(context.exception), "Notifier missing") + await subscriber.register_for_notifications(MagicMock(spec=UUri), None) + self.assertEqual(str(context.exception), "Handler missing") + + # Verify the notifier interaction + self.notifier.register_notification_listener.assert_not_called() + + async def test_register_notification_api_when_passed_a_valid_topic_and_handler(self): + subscriber = InMemoryUSubscriptionClient(MockUTransport()) + assert subscriber is not None + + # Define the handler + class MySubscriptionChangeHandler(SubscriptionChangeHandler): + def handle_subscription_change(self, topic, status): + raise NotImplementedError("Unimplemented method 'handleSubscriptionChange'") + + handler = MySubscriptionChangeHandler() + + status = await subscriber.register_for_notifications(UUri(), handler) + self.assertTrue(status is not None) + + async def test_register_notification_api_when_invoke_method_throws_an_exception(self): + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + self.transport.get_source.return_value = self.source + + self.rpc_client.invoke_method.return_value = UStatusError.from_code_message(code=UCode.PERMISSION_DENIED, + message="Not permitted") + + # Initialize the subscription client + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + # Define the subscription change handler + class MySubscriptionChangeHandler(SubscriptionChangeHandler): + async def handle_subscription_change(self, topic, status): + raise NotImplementedError("Unimplemented method 'handle_subscription_change'") + + handler = MySubscriptionChangeHandler() + with self.assertRaises(UStatusError) as context: + await subscriber.register_for_notifications(self.topic, handler) + + self.assertEqual(UCode.PERMISSION_DENIED, context.exception.status.code) + self.assertEqual("Not permitted", context.exception.status.message) + + async def test_register_for_notifications_to_the_same_topic_twice_with_same_notification_handler(self): + self.transport.get_source.return_value = self.source + + self.rpc_client.invoke_method.return_value = UPayload.pack(None) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + # First register_for_notifications attempt + result = await subscriber.register_for_notifications(self.topic, handler, CallOptions.DEFAULT) + self.assertTrue(result is not None) + + # Second register_for_notifications attempt + result = await subscriber.register_for_notifications(self.topic, handler, CallOptions.DEFAULT) + self.assertTrue(result is not None) + + self.assertEqual(self.rpc_client.invoke_method.call_count, 2) + self.assertEqual(self.transport.get_source.call_count, 2) + + async def test_register_for_notifications_to_the_same_topic_twice_with_different_notification_handler(self): + self.transport.get_source.return_value = self.source + + self.rpc_client.invoke_method.return_value = UPayload.pack(None) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + # First register_for_notifications attempt + result = await subscriber.register_for_notifications(self.topic, handler, CallOptions.DEFAULT) + self.assertTrue(result is not None) + handler1 = MagicMock(spec=SubscriptionChangeHandler) + handler1.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + # Second register_for_notifications attempt + with self.assertRaises(UStatusError) as context: + await subscriber.register_for_notifications(self.topic, handler1, CallOptions.DEFAULT) + self.assertEqual(UCode.ALREADY_EXISTS, context.exception.status.code) + self.assertEqual("Handler already registered", context.exception.status.message) + + self.assertEqual(self.rpc_client.invoke_method.call_count, 2) + self.assertEqual(self.transport.get_source.call_count, 2) if __name__ == '__main__': diff --git a/tests/test_communication/test_uclient.py b/tests/test_communication/test_uclient.py index d179e4f..1a4e753 100644 --- a/tests/test_communication/test_uclient.py +++ b/tests/test_communication/test_uclient.py @@ -22,12 +22,6 @@ from uprotocol.communication.uclient import UClient from uprotocol.communication.upayload import UPayload from uprotocol.communication.ustatuserror import UStatusError -from uprotocol.core.usubscription.v3.usubscription_pb2 import ( - SubscriptionResponse, - SubscriptionStatus, - UnsubscribeResponse, -) -from uprotocol.transport.builder.umessagebuilder import UMessageBuilder from uprotocol.transport.ulistener import UListener from uprotocol.v1.ucode_pb2 import UCode from uprotocol.v1.umessage_pb2 import UMessage @@ -159,24 +153,6 @@ async def test_invoke_method_with_multi_invoke_transport(self): self.assertFalse(future_result1.exception()) self.assertFalse(future_result2.exception()) - async def test_subscribe_happy_path(self): - topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) - transport = HappySubscribeUTransport() - upclient = UClient(transport) - subscription_response = await upclient.subscribe(topic, self.listener, CallOptions(timeout=5000)) - # check for successfully subscribed - self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) - - async def test_unregister_listener(self): - topic = create_topic() - my_listener = create_autospec(UListener, instance=True) - - subscriber = UClient(HappySubscribeUTransport()) - subscription_response = await subscriber.subscribe(topic, my_listener, CallOptions.DEFAULT) - self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) - status = await subscriber.unregister_listener(topic, my_listener) - self.assertEqual(status.code, UCode.OK) - async def test_registering_request_listener(self): handler = create_autospec(RequestHandler, instance=True) server = UClient(MockUTransport()) @@ -225,9 +201,6 @@ async def run_tests(): client.notify(create_topic(), create_destination_uri()), client.publish(create_topic()), client.invoke_method(create_method_uri(), UPayload.pack(None), CallOptions.DEFAULT), - client.subscribe(create_topic(), listener), - client.unsubscribe(create_topic(), listener), - unregister_listener_not_registered(client, listener), client.register_notification_listener(create_topic(), listener), client.unregister_notification_listener(create_topic(), listener), register_and_unregister_request_handler(client, handler), @@ -251,26 +224,6 @@ def create_method_uri(): return UUri(authority_name="neelam", ue_id=4, ue_version_major=1, resource_id=3) -class HappySubscribeUTransport(MockUTransport): - def build_response(self, request): - return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( - UPayload.pack( - SubscriptionResponse( - status=SubscriptionStatus( - state=SubscriptionStatus.State.SUBSCRIBED, message="Successfully Subscribed" - ) - ) - ) - ) - - -class HappyUnSubscribeUTransport(MockUTransport): - def build_response(self, request): - return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( - UPayload.pack(UnsubscribeResponse()) - ) - - class CommStatusTransport(MockUTransport): def build_response(self, request): status = UStatus(UCode.FAILED_PRECONDITION, "CommStatus Error") diff --git a/uprotocol/client/__init__.py b/uprotocol/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uprotocol/client/usubscription/__init__.py b/uprotocol/client/usubscription/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uprotocol/client/usubscription/v3/__init__.py b/uprotocol/client/usubscription/v3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uprotocol/communication/inmemorysubscriber.py b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py similarity index 55% rename from uprotocol/communication/inmemorysubscriber.py rename to uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py index 3533d74..60002eb 100644 --- a/uprotocol/communication/inmemorysubscriber.py +++ b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py @@ -14,16 +14,23 @@ from typing import Dict, Optional +from uprotocol.client.usubscription.v3.subscriptionchangehandler import SubscriptionChangeHandler +from uprotocol.client.usubscription.v3.usubscriptionclient import USubscriptionClient from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.notifier import Notifier from uprotocol.communication.rpcclient import RpcClient from uprotocol.communication.rpcmapper import RpcMapper -from uprotocol.communication.subscriber import Subscriber -from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler +from uprotocol.communication.simplenotifier import SimpleNotifier from uprotocol.communication.upayload import UPayload from uprotocol.communication.ustatuserror import UStatusError from uprotocol.core.usubscription.v3 import usubscription_pb2 from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + FetchSubscribersRequest, + FetchSubscribersResponse, + FetchSubscriptionsRequest, + NotificationsRequest, + NotificationsResponse, SubscriberInfo, SubscriptionRequest, SubscriptionResponse, @@ -32,6 +39,7 @@ UnsubscribeResponse, Update, ) + from uprotocol.transport.ulistener import UListener from uprotocol.transport.utransport import UTransport from uprotocol.uri.factory.uri_factory import UriFactory @@ -70,32 +78,30 @@ def on_receive(self, message: UMessage) -> None: pass -class InMemorySubscriber(Subscriber): +class InMemoryUSubscriptionClient(USubscriptionClient): """ - The following is an in-memory implementation of the Subscriber interface that - wraps the UTransport for implementing the Subscriber-side of the pub/sub - messaging pattern to allow developers to subscribe and unsubscribe to topics. - This implementation uses the InMemoryRpcClient and SimpleNotifier interfaces - to invoke the subscription request message to the usubscription service, and register - to receive notifications for changes from the uSubscription service. + Implementation of USubscriptionClient that caches state information within the object + and used for single tenant applications (ex. in-vehicle). The implementation uses InMemoryRpcClient + that also stores RPC correlation information within the objects """ - def __init__(self, transport: UTransport, rpc_client: RpcClient, notifier: Notifier): + def __init__(self, transport: UTransport, rpc_client: Optional[RpcClient] = None, notifier: Optional[Notifier] = None): """ - Creates a new subscriber for existing Communication Layer client implementations. + Creates a new USubscription client passing UTransport, CallOptions, and an implementation + of RpcClient and Notifier. - :param transport: The transport to use for sending the notifications - :param rpc_client: The RPC client to use for sending the RPC requests - :param notifier: The notifier to register notification listeners + :param transport: The transport to use for sending the notifications. + :param rpcClient: The RPC client to use for sending the RPC requests. + :param notifier: The notifier to use for registering the notification listener. """ if not transport: raise ValueError(UTransport.TRANSPORT_NULL_ERROR) elif not isinstance(transport, UTransport): raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) - elif not rpc_client: - raise ValueError("RpcClient missing") - elif not notifier: - raise ValueError("Notifier missing") + if not rpc_client: + rpc_client = InMemoryRpcClient(transport) + if not notifier: + notifier = SimpleNotifier(transport) self.transport = transport self.rpc_client = rpc_client self.notifier = notifier @@ -106,12 +112,16 @@ def __init__(self, transport: UTransport, rpc_client: RpcClient, notifier: Notif self.notification_uri = UriFactory.from_proto(service_descriptor, 0x8000) self.subscribe_uri = UriFactory.from_proto(service_descriptor, 1) self.unsubscribe_uri = UriFactory.from_proto(service_descriptor, 2) + self.fetch_subscribers_uri = UriFactory.from_proto(service_descriptor, 8) + self.fetch_subscriptions_uri = UriFactory.from_proto(service_descriptor, 3) + self.register_for_notification_uri = UriFactory.from_proto(service_descriptor, 6) + self.unregister_for_notification_uri = UriFactory.from_proto(service_descriptor, 7) async def subscribe( self, topic: UUri, listener: UListener, - options: CallOptions = None, + options: CallOptions = CallOptions.DEFAULT, handler: Optional[SubscriptionChangeHandler] = None, ) -> SubscriptionResponse: """ @@ -128,8 +138,8 @@ async def subscribe( returned. :param topic: The topic to subscribe to. - :param listener: The listener function to be called when a message is received on the topic. - :param options: Optional call options for the subscription. + :param listener: The listener function to be called when messages are received. + :param options: Optional call options to be used for the subscription. :param handler: Optional handler function for handling subscription state changes. :return: An async operation that yields a SubscriptionResponse upon success or raises an exception with the failure reason as UStatus. UCode.ALREADY_EXISTS will be returned if called multiple times @@ -139,6 +149,8 @@ async def subscribe( raise ValueError("Subscribe topic missing") if not listener: raise ValueError("Request listener missing") + if not options: + raise ValueError("Call Options missing") if not self.is_listener_registered: # Ensure listener is registered before proceeding @@ -175,7 +187,9 @@ async def subscribe( self.handlers[topic_str] = handler return response - async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions = None) -> UStatus: + async def unsubscribe( + self, topic: UUri, listener: UListener, options: CallOptions = CallOptions.DEFAULT + ) -> UStatus: """ Unsubscribes from a given topic. @@ -186,13 +200,15 @@ async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptio :param topic: The topic to unsubscribe from. :param listener: The listener function associated with the topic. - :param options: Optional call options for the subscription. + :param options: Optional call options to be used for the unsubscription. :return: An async operation that yields a UStatus indicating the result of the unsubscribe request. """ if not topic: raise ValueError("Unsubscribe topic missing") if not listener: raise ValueError("Listener missing") + if not options: + raise ValueError("CallOptions missing") unsubscribe_request = UnsubscribeRequest(topic=topic) future_result = self.rpc_client.invoke_method(self.unsubscribe_uri, UPayload.pack(unsubscribe_request), options) @@ -218,7 +234,9 @@ async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus raise ValueError("Unsubscribe topic missing") if not listener: raise ValueError("Request listener missing") - return await self.transport.unregister_listener(topic, listener) + status = await self.transport.unregister_listener(topic, listener) + self.handlers.pop(UriSerializer.serialize(topic), None) + return status def close(self): """ @@ -226,3 +244,109 @@ def close(self): """ self.handlers.clear() self.notifier.unregister_notification_listener(self.notification_uri, self.notification_handler) + + async def register_for_notifications( + self, topic: UUri, handler: SubscriptionChangeHandler, options: Optional[CallOptions] = CallOptions.DEFAULT + ): + """ + Register for Subscription Change Notifications. + + This API allows producers to register to receive subscription change notifications for + topics that they produce only. + + :param topic: UUri, The topic to register for notifications. + :param handler: callable, The SubscriptionChangeHandler to handle the subscription changes. + :param options: CallOptions, The CallOptions to be used for the register request. + + :return: asyncio.Future[NotificationsResponse], A future object that completes with NotificationsResponse + if the uSubscription service accepts the request to register the caller to be notified of subscription + changes, or raises an exception if there is a failure reason. + """ + if not topic: + raise ValueError("Topic missing") + if not handler: + raise ValueError("Handler missing") + if not options: + raise ValueError("CallOptions missing") + + request = NotificationsRequest(topic=topic, subscriber=SubscriberInfo(uri=self.transport.get_source())) + + response = self.rpc_client.invoke_method(self.register_for_notification_uri, UPayload.pack(request), options) + notifications_response = await RpcMapper.map_response(response, NotificationsResponse) + if handler: + topic_str = UriSerializer.serialize(topic) + if topic_str in self.handlers and self.handlers[topic_str] != handler: + raise UStatusError.from_code_message(UCode.ALREADY_EXISTS, "Handler already registered") + self.handlers[topic_str] = handler + + return notifications_response + + async def unregister_for_notifications( + self, topic: UUri, handler: SubscriptionChangeHandler, options: Optional[CallOptions] = CallOptions.DEFAULT + ): + """ + Unregister for subscription change notifications. + + :param topic: The topic to unregister for notifications. + :param handler: The `SubscriptionChangeHandler` to handle the subscription changes. + :param options: The `CallOptions` to be used for the unregister request. + :return: A `NotificationResponse` with the status of the API call to the uSubscription service, + or a `UStatus` with the reason for the failure. `UCode.PERMISSION_DENIED` is returned if the + topic `ue_id` does not equal the caller's `ue_id`. + """ + if not topic: + raise ValueError("Topic missing") + if not handler: + raise ValueError("Handler missing") + if not options: + raise ValueError("CallOptions missing") + + request = NotificationsRequest(topic=topic, subscriber=SubscriberInfo(uri=self.transport.get_source())) + + response = self.rpc_client.invoke_method(self.unregister_for_notification_uri, UPayload.pack(request), options) + notifications_response = await RpcMapper.map_response(response, NotificationsResponse) + + self.handlers.pop(UriSerializer.serialize(topic), None) + + return notifications_response + + async def fetch_subscribers(self, topic: UUri, options: Optional[CallOptions] = CallOptions.DEFAULT): + """ + Fetch the list of subscribers for a given produced topic. + + :param topic: The topic to fetch the subscribers for. + :param options: The `CallOptions` to be used for the fetch request. + :return: A `FetchSubscribersResponse` that contains the list of subscribers, + or a `UStatus` with the reason for the failure. + """ + if topic is None: + raise ValueError("Topic missing") + if options is None: + raise ValueError("CallOptions missing") + + request = FetchSubscribersRequest(topic=topic) + result = self.rpc_client.invoke_method(self.fetch_subscribers_uri, UPayload.pack(request), options) + return await RpcMapper.map_response(result, FetchSubscribersResponse) + + async def fetch_subscriptions( + self, request: FetchSubscriptionsRequest, options: Optional[CallOptions] = CallOptions.DEFAULT + ): + """ + Fetch the list of subscriptions for a given topic. + + This API provides more information than `fetch_subscribers()` as it also returns + `SubscribeAttributes` per subscriber, which might be useful for the producer to know. + + :param request: The request to fetch subscriptions for. + :param options: The `CallOptions` to be used for the request. + :return: A `FetchSubscriptionsResponse` that contains the subscription information per subscriber to the topic. + If unsuccessful, returns a `UStatus` with the reason for the failure. + `UCode.PERMISSION_DENIED` is returned if the topic `ue_id` does not equal the caller's `ue_id`. + """ + if request is None: + raise ValueError("Request missing") + if options is None: + raise ValueError("CallOptions missing") + + result = self.rpc_client.invoke_method(self.fetch_subscriptions_uri, UPayload.pack(request), options) + return await RpcMapper.map_response(result, FetchSubscribersResponse) diff --git a/uprotocol/communication/subscriptionchangehandler.py b/uprotocol/client/usubscription/v3/subscriptionchangehandler.py similarity index 100% rename from uprotocol/communication/subscriptionchangehandler.py rename to uprotocol/client/usubscription/v3/subscriptionchangehandler.py diff --git a/uprotocol/client/usubscription/v3/usubscriptionclient.py b/uprotocol/client/usubscription/v3/usubscriptionclient.py new file mode 100644 index 0000000..c7ae64d --- /dev/null +++ b/uprotocol/client/usubscription/v3/usubscriptionclient.py @@ -0,0 +1,177 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod +from typing import Optional + +from uprotocol.client.usubscription.v3.subscriptionchangehandler import SubscriptionChangeHandler +from uprotocol.communication.calloptions import CallOptions +from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + FetchSubscribersResponse, + FetchSubscriptionsRequest, + FetchSubscriptionsResponse, + NotificationsResponse, + SubscriptionResponse, +) +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class USubscriptionClient(ABC): + """ + The client-side interface for communicating with the USubscription service asynchronously. + + Provides methods for subscribing, unsubscribing, registering listeners, fetching subscribers, + fetching subscriptions, and handling subscription change notifications. + """ + + @abstractmethod + async def subscribe( + self, + topic: UUri, + listener: UListener, + options: Optional[CallOptions] = CallOptions.DEFAULT, + handler: Optional[SubscriptionChangeHandler] = None, + ) -> SubscriptionResponse: + """ + Subscribes to a given topic asynchronously. + + The API will return a SubscriptionResponse or raise an exception if the subscription + was not successful. The optional passed SubscriptionChangeHandler is used to receive + notifications of changes to the subscription status like a transition from + SubscriptionStatus.State.SUBSCRIBE_PENDING to SubscriptionStatus.State.SUBSCRIBED that + occurs when we subscribe to remote topics that the device we are on has not yet a + subscriber that has subscribed to said topic. + + :param topic: The topic to subscribe to. + :param listener: The listener to be called when messages are received. + :param options: The CallOptions to be used for the subscription. + :param handler: SubscriptionChangeHandler to handle changes to subscription states. + + :return: Returns a SubscriptionResponse or raises an exception with the failure reason + as UStatus. UCode.ALREADY_EXISTS will be raised if you call this API multiple times + passing a different handler. + """ + pass + + @abstractmethod + async def unsubscribe( + self, topic: UUri, listener: UListener, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> UStatus: + """ + Unsubscribes from a given topic asynchronously. + + The subscriber no longer wishes to be subscribed to said topic so we issue an unsubscribe + request to the USubscription service. The API will return a UStatus with the result of + the unsubscribe request. If we are unable to unsubscribe to the topic with the USubscription + service, the listener and handler (if any) will remain registered. + + :param topic: The topic to unsubscribe from. + :param listener: The listener to be called when a message is received on the topic. + :param options: The CallOptions to be used for the unsubscribe request. + + :return: Returns a UStatus with the result from the unsubscribe request. + """ + pass + + @abstractmethod + async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Unregister a listener and remove any registered SubscriptionChangeHandler for the topic. + + This method is used to remove handlers/listeners without notifying the uSubscription service + so that we can be persistently subscribed even when the uE is not running. + + :param topic: The topic to unsubscribe from. + :param listener: The listener to be called when a message is received on the topic. + + :return: Returns a UStatus with the status of the listener unregister request. + """ + pass + + @abstractmethod + async def register_for_notifications( + self, topic: UUri, handler: SubscriptionChangeHandler, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> NotificationsResponse: + """ + Register for Subscription Change Notifications. + + This API allows producers to register to receive subscription change notifications for + topics that they produce only. + + NOTE: Subscribers are automatically registered to receive notifications when they call + `subscribe()` API passing a SubscriptionChangeHandler so they do not need to + call this API. + + :param topic: The topic to register for notifications. + :param handler: The SubscriptionChangeHandler to handle the subscription changes. + :param options: The CallOptions to be used for the request. Default is CallOptions.DEFAULT. + + :return: Returns NotificationsResponse completed successfully if uSubscription service accepts the + request to register the caller to be notified of subscription changes, or + returns UStatus that indicates the failure reason. + """ + pass + + @abstractmethod + async def unregister_for_notifications( + self, topic: UUri, handler: SubscriptionChangeHandler, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> NotificationsResponse: + """ + Unregister for subscription change notifications. + + :param topic: The topic to unregister for notifications. + :param handler: The SubscriptionChangeHandler to be unregistered. + :param options: The CallOptions to be used for the request. Default is CallOptions.DEFAULT. + + :return: Returns NotificationsResponse completed successfully with the status of the API call to + uSubscription service, or completed unsuccessfully with UStatus with the reason for the failure. + """ + pass + + @abstractmethod + async def fetch_subscribers( + self, topic: UUri, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> FetchSubscribersResponse: + """ + Fetch the list of subscribers for a given produced topic. + + :param topic: The topic to fetch the subscribers for. + :param options: The CallOptions to be used for the request. + + :return: Returns FetchSubscribersResponse completed successfully with the list of subscribers, + or completed unsuccessfully with UStatus with the reason for the failure. + """ + pass + + @abstractmethod + async def fetch_subscriptions( + self, request: FetchSubscriptionsRequest, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> FetchSubscriptionsResponse: + """ + Fetch list of Subscriptions for a given topic. + + API provides more information than fetchSubscribers() in that it also returns + SubscribeAttributes per subscriber that might be useful to the producer to know. + + :param request: The request to fetch subscriptions for. + :param options: The CallOptions to be used for the request. + + :return: Returns FetchSubscriptionsResponse completed successfully with the subscription + information per subscriber to the topic, or completed unsuccessfully with UStatus + with the reason for the failure. UCode.PERMISSION_DENIED is returned if the topic + ue_id does not equal the caller's ue_id. + """ + pass diff --git a/uprotocol/communication/subscriber.py b/uprotocol/communication/subscriber.py deleted file mode 100644 index e1e19f4..0000000 --- a/uprotocol/communication/subscriber.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -This program and the accompanying materials are made available under the -terms of the Apache License Version 2.0 which is available at - - http://www.apache.org/licenses/LICENSE-2.0 - -SPDX-License-Identifier: Apache-2.0 -""" - -from abc import ABC, abstractmethod -from typing import Optional - -from uprotocol.communication.calloptions import CallOptions -from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler -from uprotocol.core.usubscription.v3.usubscription_pb2 import ( - SubscriptionResponse, -) -from uprotocol.transport.ulistener import UListener -from uprotocol.v1.uri_pb2 import UUri -from uprotocol.v1.ustatus_pb2 import UStatus - - -class Subscriber(ABC): - """ - Communication Layer (uP-L2) Subscriber interface. - - This interface provides APIs to subscribe and unsubscribe to a given topic. - """ - - @abstractmethod - async def subscribe( - self, - topic: UUri, - listener: UListener, - options: Optional[CallOptions] = None, - handler: Optional[SubscriptionChangeHandler] = None, - ) -> SubscriptionResponse: - """ - Subscribes to a given topic asynchronously. - - The API will return a SubscriptionResponse or raise an exception if the subscription fails. - It registers the listener to be called when messages are received and allows the caller to register - a SubscriptionChangeHandler that is called whenever the subscription state changes - (e.g., SubscriptionStatus.State.PENDING_SUBSCRIBED to SubscriptionStatus.State.SUBSCRIBED, - SubscriptionStatus.State.SUBSCRIBED to SubscriptionStatus.State.UNSUBSCRIBED, etc.). - - :param topic: The topic to subscribe to. - :param listener: The UListener that is called when published messages are received. - :param options: The CallOptions to provide additional information (timeout, token, etc.). - :param handler: SubscriptionChangeHandler to handle changes to subscription states. - :return: Returns the SubscriptionResponse or raises an exception with the failure reason as UStatus. - """ - pass - - @abstractmethod - async def unsubscribe( - self, topic: UUri, listener: UListener, options: Optional[CallOptions] = CallOptions.DEFAULT - ) -> UStatus: - """ - Unsubscribes from a given topic. - - The subscriber no longer wishes to be subscribed to the specified topic, trigger an unsubscribe - request to the USubscription service. - - :param topic: The topic to unsubscribe to. - :param listener: The listener to be called when a message is received on the topic. - :param options: The call options for the subscription. - :return: Returns UStatus with the result from the unsubscribe request. - """ - pass - - @abstractmethod - async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: - """ - Unregisters a listener from a topic asynchronously. - - This method will only unregister the listener for a given subscription, allowing the uE to remain subscribed - even if the listener is removed. - - :param topic: The topic to subscribe to. - :param listener: The listener to be called when a message is received on the topic. - :return: Returns UStatus with the status of the listener unregister request. - """ - pass diff --git a/uprotocol/communication/uclient.py b/uprotocol/communication/uclient.py index 7bbc7d3..db91839 100644 --- a/uprotocol/communication/uclient.py +++ b/uprotocol/communication/uclient.py @@ -17,26 +17,20 @@ from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer -from uprotocol.communication.inmemorysubscriber import InMemorySubscriber from uprotocol.communication.notifier import Notifier from uprotocol.communication.publisher import Publisher from uprotocol.communication.rpcclient import RpcClient from uprotocol.communication.rpcserver import RpcServer from uprotocol.communication.simplenotifier import SimpleNotifier from uprotocol.communication.simplepublisher import SimplePublisher -from uprotocol.communication.subscriber import Subscriber -from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.communication.upayload import UPayload -from uprotocol.core.usubscription.v3.usubscription_pb2 import ( - SubscriptionResponse, -) from uprotocol.transport.ulistener import UListener from uprotocol.transport.utransport import UTransport from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus -class UClient(RpcServer, Subscriber, Notifier, Publisher, RpcClient): +class UClient(RpcServer, Notifier, Publisher, RpcClient): """ UClient provides a unified interface for various communication patterns over a UTransport instance, including RPC, subscriptions, notifications, and message publishing. It combines functionalities @@ -64,59 +58,6 @@ def __init__(self, transport: UTransport): self.publisher = SimplePublisher(self.transport) self.notifier = SimpleNotifier(self.transport) self.rpc_client = InMemoryRpcClient(self.transport) - self.subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) - - async def subscribe( - self, - topic: UUri, - listener: UListener, - options: Optional[CallOptions] = None, - handler: Optional[SubscriptionChangeHandler] = None, - ) -> SubscriptionResponse: - """ - Subscribe to a given topic asynchronously. - - The API will return a SubscriptionResponse or raise an exception if the subscription fails. - It registers the listener to be called when messages are received and allows the caller to register - a SubscriptionChangeHandler that is called whenever the subscription state changes (e.g., PENDING_SUBSCRIBED to - SUBSCRIBED, SUBSCRIBED to UNSUBSCRIBED, etc.). - - :param topic: The topic to subscribe to. - :param listener: The UListener that is called when published messages are received. - :param options: The CallOptions to provide additional information (timeout, token, etc.). - :param handler: SubscriptionChangeHandler to handle changes to subscription states. - :return: Returns the SubscriptionResponse or raises an exception with the failure reason as UStatus. - """ - return await self.subscriber.subscribe(topic, listener, options, handler) - - async def unsubscribe( - self, topic: UUri, listener: UListener, options: Optional[CallOptions] = CallOptions.DEFAULT - ) -> UStatus: - """ - Unsubscribe to a given topic asynchronously. - - The subscriber no longer wishes to be subscribed to the specified topic, trigger an unsubscribe - request to the USubscription service. - - :param topic: The topic to unsubscribe to. - :param listener: The listener to be called when a message is received on the topic. - :param options: The call options for the subscription. - :return: Returns UStatus with the result from the unsubscribe request. - """ - return await self.subscriber.unsubscribe(topic, listener, options) - - async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: - """ - Unregister a listener from a topic. - - This method will only unregister the listener for a given subscription thus allowing a uE to stay - subscribed even if the listener is removed. - - :param topic: The topic to subscribe to. - :param listener: The listener to be called when a message is received on the topic. - :return: Returns UStatus with the status of the listener unregister request. - """ - return await self.subscriber.unregister_listener(topic, listener) async def notify( self, topic: UUri, destination: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None @@ -207,5 +148,3 @@ async def invoke_method( def close(self): if self.rpc_client: self.rpc_client.close() - if self.subscriber: - self.subscriber.close() diff --git a/uprotocol/transport/utransport.py b/uprotocol/transport/utransport.py index 85b936f..d0cb66b 100644 --- a/uprotocol/transport/utransport.py +++ b/uprotocol/transport/utransport.py @@ -15,6 +15,7 @@ from abc import ABC, abstractmethod from uprotocol.transport.ulistener import UListener +from uprotocol.uri.factory.uri_factory import UriFactory from uprotocol.v1.umessage_pb2 import UMessage from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus @@ -39,7 +40,9 @@ async def send(self, message: UMessage) -> UStatus: pass @abstractmethod - async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def register_listener( + self, source_filter: UUri, listener: UListener, sink_filter: UUri = UriFactory.ANY + ) -> UStatus: """Register UListener for UUri source and sink filters to be called when a message is received. @@ -56,7 +59,9 @@ async def register_listener(self, source_filter: UUri, listener: UListener, sink pass @abstractmethod - async def unregister_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def unregister_listener( + self, source_filter: UUri, listener: UListener, sink_filter: UUri = UriFactory.ANY + ) -> UStatus: """Unregister UListener for UUri source and sink filters. Messages arriving at this topic will no longer be processed by this listener. From 8710a7b77a6085012e6c865daa39cee7dc08c853 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Thu, 18 Jul 2024 13:51:30 -0400 Subject: [PATCH 02/10] Updated UListener on_receive method to be asynchronous and add additional test cases --- .../test_inmemoryusubcriptionclient.py | 166 +++++++++++++++++- tests/test_communication/mock_utransport.py | 10 +- .../test_inmemoryrpcserver.py | 21 ++- .../test_communication/test_simplenotifier.py | 6 +- tests/test_communication/test_uclient.py | 4 +- tests/test_transport/test_utransport.py | 8 +- .../v3/inmemoryusubcriptionclient.py | 12 +- uprotocol/communication/inmemoryrpcclient.py | 3 +- uprotocol/communication/inmemoryrpcserver.py | 5 +- uprotocol/transport/ulistener.py | 18 +- 10 files changed, 221 insertions(+), 32 deletions(-) diff --git a/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py b/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py index 3181230..32fb7bf 100644 --- a/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py +++ b/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py @@ -25,6 +25,9 @@ from uprotocol.communication.upayload import UPayload from uprotocol.communication.ustatuserror import UStatusError from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + FetchSubscribersResponse, + FetchSubscriptionsRequest, + FetchSubscriptionsResponse, SubscriptionResponse, SubscriptionStatus, UnsubscribeResponse, @@ -40,7 +43,7 @@ class MyListener(UListener): - def on_receive(self, umsg: UMessage) -> None: + async def on_receive(self, umsg: UMessage) -> None: pass @@ -150,6 +153,19 @@ async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokeme self.transport.register_listener.assert_not_called() self.transport.get_source.assert_called_once() + async def test_subscribe_when_register_notification_listener_return_failed_status(self): + self.transport.get_source.return_value = self.source + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.INTERNAL) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + with self.assertRaises(UStatusError) as context: + await subscriber.subscribe(self.topic, self.listener) + self.assertEqual(UCode.INTERNAL, context.exception.status.code) + self.assertEqual("Failed to register listener for rpc client", context.exception.status.message) + async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokemethod_return_an_ustatuserror(self): self.transport.get_source.return_value = self.source @@ -319,7 +335,7 @@ async def register_notification_listener(uri, listener): await barrier.wait() # Wait for the barrier again update = Update(topic=self.topic, status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBED)) message = UMessageBuilder.notification(self.topic, self.source).build_from_upayload(UPayload.pack(update)) - listener.on_receive(message) + await listener.on_receive(message) return UStatus(code=UCode.OK) self.notifier.register_notification_listener = AsyncMock(side_effect=register_notification_listener) @@ -346,11 +362,26 @@ async def test_unregister_listener_missing_topic(self): async def test_unregister_listener_missing_listener(self): notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unregister_listener(self.topic, None) self.assertEqual(str(context.exception), "Request listener missing") + async def test_unregister_listener_happy_path(self): + class MyListener(UListener): + async def on_receive(self, umsg: UMessage) -> None: + pass + + listener = MyListener() + notifier = MagicMock(spec=SimpleNotifier) + self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) + # with self.assertRaises(ValueError) as context: + status = await subscriber.unregister_listener(self.topic, listener) + self.assertEqual(UCode.OK, status.code) + async def test_unsubscribe_missing_topic(self): notifier = MagicMock(spec=SimpleNotifier) subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) @@ -365,6 +396,13 @@ async def test_unsubscribe_missing_listener(self): await subscriber.unsubscribe(self.topic, None, CallOptions()) self.assertEqual(str(context.exception), "Listener missing") + async def test_unsubscribe_missing_options(self): + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) + with self.assertRaises(ValueError) as context: + await subscriber.unsubscribe(self.topic, self.listener, None) + self.assertEqual(str(context.exception), "CallOptions missing") + async def test_subscribe_missing_topic(self): notifier = MagicMock(spec=SimpleNotifier) subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) @@ -372,6 +410,13 @@ async def test_subscribe_missing_topic(self): await subscriber.subscribe(None, self.listener, CallOptions()) self.assertEqual(str(context.exception), "Subscribe topic missing") + async def test_subscribe_missing_options(self): + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) + with self.assertRaises(ValueError) as context: + await subscriber.subscribe(self.topic, self.listener, None) + self.assertEqual(str(context.exception), "CallOptions missing") + async def test_subscribe_missing_listener(self): notifier = MagicMock(spec=SimpleNotifier) subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) @@ -449,8 +494,9 @@ async def test_register_notification_api_when_invoke_method_throws_an_exception( self.transport.get_source.return_value = self.source - self.rpc_client.invoke_method.return_value = UStatusError.from_code_message(code=UCode.PERMISSION_DENIED, - message="Not permitted") + self.rpc_client.invoke_method.return_value = UStatusError.from_code_message( + code=UCode.PERMISSION_DENIED, message="Not permitted" + ) # Initialize the subscription client subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) @@ -519,6 +565,118 @@ async def test_register_for_notifications_to_the_same_topic_twice_with_different self.assertEqual(self.rpc_client.invoke_method.call_count, 2) self.assertEqual(self.transport.get_source.call_count, 2) + async def test_unregister_notification_api_for_the_happy_path(self): + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + self.transport.get_source.return_value = self.source + self.rpc_client.invoke_method.return_value = UPayload.pack(None) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + try: + await subscriber.register_for_notifications(self.topic, handler) + await subscriber.unregister_for_notifications(self.topic, handler) + except Exception as e: + self.fail(f"Exception occurred: {e}") + + async def test_unregister_notification_api_topic_missing(self): + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + self.transport.get_source.return_value = self.source + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + with self.assertRaises(ValueError) as error: + await subscriber.unregister_for_notifications(None, handler) + self.assertEqual("Topic missing", str(error.exception)) + + async def test_unregister_notification_api_handler_missing(self): + self.transport.get_source.return_value = self.source + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + with self.assertRaises(ValueError) as error: + await subscriber.unregister_for_notifications(self.topic, None) + self.assertEqual("Handler missing", str(error.exception)) + + async def test_unregister_notification_api_options_none(self): + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + self.transport.get_source.return_value = self.source + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + with self.assertRaises(ValueError) as error: + await subscriber.unregister_for_notifications(self.topic, handler, None) + self.assertEqual("CallOptions missing", str(error.exception)) + + async def test_register_notification_api_options_none(self): + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + self.transport.get_source.return_value = self.source + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + with self.assertRaises(ValueError) as error: + await subscriber.register_for_notifications(self.topic, handler, None) + self.assertEqual("CallOptions missing", str(error.exception)) + + async def test_fetch_subscribers_when_passing_null_topic(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + + with self.assertRaises(ValueError) as error: + await subscriber.fetch_subscribers(None) + self.assertEqual("Topic missing", str(error.exception)) + + async def test_fetch_subscribers_when_passing_null_calloptions(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + + with self.assertRaises(ValueError) as error: + await subscriber.fetch_subscribers(self.topic, None) + self.assertEqual("CallOptions missing", str(error.exception)) + + async def test_fetch_subscribers_passing_a_valid_topic(self): + subscriber = InMemoryUSubscriptionClient(MockUTransport()) + + try: + response = await subscriber.fetch_subscribers(self.topic) + self.assertEqual(response, FetchSubscribersResponse()) + except Exception as e: + self.fail(f"Exception occurred: {e}") + + async def test_fetch_subscriptions_when_passing_null_request(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + + with self.assertRaises(ValueError) as error: + await subscriber.fetch_subscriptions(None) + self.assertEqual("Request missing", str(error.exception)) + + async def test_fetch_subscriptions_when_passing_null_calloptions(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + + with self.assertRaises(ValueError) as error: + await subscriber.fetch_subscriptions(FetchSubscriptionsRequest(), None) + self.assertEqual("CallOptions missing", str(error.exception)) + + async def test_fetch_subscriptions_passing_a_valid_fetch_subscription_request(self): + request = FetchSubscriptionsRequest(topic=self.topic) + subscriber = InMemoryUSubscriptionClient(MockUTransport()) + + try: + response = await subscriber.fetch_subscriptions(request) + self.assertEqual(response, FetchSubscriptionsResponse()) + except Exception as e: + self.fail(f"Exception occurred: {e}") + if __name__ == '__main__': unittest.main() diff --git a/tests/test_communication/mock_utransport.py b/tests/test_communication/mock_utransport.py index 6e5b1fa..8090d0e 100644 --- a/tests/test_communication/mock_utransport.py +++ b/tests/test_communication/mock_utransport.py @@ -81,13 +81,13 @@ async def send(self, message: UMessage) -> UStatus: if message.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: response = self.build_response(message) - self._notify_listeners(response) + await self._notify_listeners(response) return UStatus(code=UCode.OK) - def _notify_listeners(self, response: UMessage): + async def _notify_listeners(self, response: UMessage): for listener in self.listeners: - listener.on_receive(response) + await listener.on_receive(response) async def register_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: self.listeners.append(listener) @@ -136,6 +136,6 @@ def build_response(self, request): async def send(self, message): response = self.build_response(message) - executor = ThreadPoolExecutor(max_workers=1) - executor.submit(self._notify_listeners, response) + + await self._notify_listeners(response) return UStatus(code=UCode.OK) diff --git a/tests/test_communication/test_inmemoryrpcserver.py b/tests/test_communication/test_inmemoryrpcserver.py index 4be86e7..c1f7f00 100644 --- a/tests/test_communication/test_inmemoryrpcserver.py +++ b/tests/test_communication/test_inmemoryrpcserver.py @@ -18,6 +18,8 @@ from unittest.mock import AsyncMock, MagicMock from tests.test_communication.mock_utransport import EchoUTransport +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer from uprotocol.communication.requesthandler import RequestHandler from uprotocol.communication.upayload import UPayload @@ -27,6 +29,7 @@ from uprotocol.transport.utransport import UTransport from uprotocol.uri.serializer.uriserializer import UriSerializer from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus @@ -136,7 +139,7 @@ async def custom_register_listener_behavior(source: UUri, listener: UListener, s async def custom_send_behavior(message): serialized_uri = UriSerializer().serialize(message.attributes.sink) if serialized_uri in listeners: - listeners[serialized_uri].on_receive(message) + await listeners[serialized_uri].on_receive(message) return UStatus(code=UCode.OK) self.mock_transport.send = AsyncMock(side_effect=custom_send_behavior) @@ -196,6 +199,22 @@ async def test_handle_requests_exception(self): status = await transport.send(request) self.assertEqual(status.code, UCode.OK) + async def test_end_to_end_rpc_with_test_transport(self): + class MyRequestHandler(RequestHandler): + def handle_request(self, message: UMessage) -> UPayload: + return UPayload.pack(UUri()) + + handler = MyRequestHandler() + test_transport = EchoUTransport() + server = InMemoryRpcServer(test_transport) + method = self.create_method_uri() + + self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) + rpc_client = InMemoryRpcClient(test_transport) + response = await rpc_client.invoke_method(method, None, CallOptions.DEFAULT) + self.assertIsNotNone(response) + self.assertEqual(response, UPayload.pack(UUri())) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_communication/test_simplenotifier.py b/tests/test_communication/test_simplenotifier.py index 12483cd..6fe630e 100644 --- a/tests/test_communication/test_simplenotifier.py +++ b/tests/test_communication/test_simplenotifier.py @@ -62,7 +62,7 @@ async def test_register_listener(self): self.transport.get_source.return_value = self.source class TestListener(UListener): - def on_receive(self, message: UMessage): + async def on_receive(self, message: UMessage): pass listener = TestListener() @@ -78,7 +78,7 @@ async def test_unregister_notification_listener(self): self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) class TestListener(UListener): - def on_receive(self, message: UMessage): + async def on_receive(self, message: UMessage): pass listener = TestListener() @@ -97,7 +97,7 @@ async def test_unregister_listener_not_registered(self): self.transport.unregister_listener.return_value = UStatus(code=UCode.NOT_FOUND) class TestListener(UListener): - def on_receive(self, message: UMessage): + async def on_receive(self, message: UMessage): pass listener = TestListener() diff --git a/tests/test_communication/test_uclient.py b/tests/test_communication/test_uclient.py index 1a4e753..6b15a3c 100644 --- a/tests/test_communication/test_uclient.py +++ b/tests/test_communication/test_uclient.py @@ -30,7 +30,7 @@ class MyListener(UListener): - def on_receive(self, umsg: UMessage) -> None: + async def on_receive(self, umsg: UMessage) -> None: # Handle receiving subscriptions here assert umsg is not None @@ -185,7 +185,7 @@ async def test_happy_path_for_all_apis_async(self): client = UClient(MockUTransport()) class MyUListener(UListener): - def on_receive(self, umsg: UMessage) -> None: + async def on_receive(self, umsg: UMessage) -> None: pass class MyRequestHandler(RequestHandler): diff --git a/tests/test_transport/test_utransport.py b/tests/test_transport/test_utransport.py index 4615d34..a6276d1 100644 --- a/tests/test_transport/test_utransport.py +++ b/tests/test_transport/test_utransport.py @@ -23,8 +23,8 @@ class MyListener(UListener): - def on_receive(self, message): - super().on_receive(message) + async def on_receive(self, message): + await super().on_receive(message) pass @@ -36,7 +36,7 @@ async def send(self, message): return UStatus(code=UCode.INVALID_ARGUMENT if message is None else UCode.OK) async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: - listener.on_receive(UMessage()) + await listener.on_receive(UMessage()) return UStatus(code=UCode.OK) async def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): @@ -54,7 +54,7 @@ async def send(self, message): return UStatus(code=UCode.INTERNAL) async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: - listener.on_receive(None) + await listener.on_receive(None) return UStatus(code=UCode.INTERNAL) async def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): diff --git a/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py index 60002eb..483ec75 100644 --- a/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py +++ b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py @@ -29,6 +29,7 @@ FetchSubscribersRequest, FetchSubscribersResponse, FetchSubscriptionsRequest, + FetchSubscriptionsResponse, NotificationsRequest, NotificationsResponse, SubscriberInfo, @@ -39,7 +40,6 @@ UnsubscribeResponse, Update, ) - from uprotocol.transport.ulistener import UListener from uprotocol.transport.utransport import UTransport from uprotocol.uri.factory.uri_factory import UriFactory @@ -55,7 +55,7 @@ class MyNotificationListener(UListener): def __init__(self, handlers): self.handlers = handlers - def on_receive(self, message: UMessage) -> None: + async def on_receive(self, message: UMessage) -> None: """ Handles incoming notifications from the USubscription service. @@ -85,7 +85,9 @@ class InMemoryUSubscriptionClient(USubscriptionClient): that also stores RPC correlation information within the objects """ - def __init__(self, transport: UTransport, rpc_client: Optional[RpcClient] = None, notifier: Optional[Notifier] = None): + def __init__( + self, transport: UTransport, rpc_client: Optional[RpcClient] = None, notifier: Optional[Notifier] = None + ): """ Creates a new USubscription client passing UTransport, CallOptions, and an implementation of RpcClient and Notifier. @@ -150,7 +152,7 @@ async def subscribe( if not listener: raise ValueError("Request listener missing") if not options: - raise ValueError("Call Options missing") + raise ValueError("CallOptions missing") if not self.is_listener_registered: # Ensure listener is registered before proceeding @@ -349,4 +351,4 @@ async def fetch_subscriptions( raise ValueError("CallOptions missing") result = self.rpc_client.invoke_method(self.fetch_subscriptions_uri, UPayload.pack(request), options) - return await RpcMapper.map_response(result, FetchSubscribersResponse) + return await RpcMapper.map_response(result, FetchSubscriptionsResponse) diff --git a/uprotocol/communication/inmemoryrpcclient.py b/uprotocol/communication/inmemoryrpcclient.py index b7720b8..1f68c56 100644 --- a/uprotocol/communication/inmemoryrpcclient.py +++ b/uprotocol/communication/inmemoryrpcclient.py @@ -34,7 +34,7 @@ class HandleResponsesListener(UListener): def __init__(self, requests): self.requests = requests - def on_receive(self, umsg: UMessage) -> None: + async def on_receive(self, umsg: UMessage) -> None: """ Handle the responses coming back from the server asynchronously. @@ -56,7 +56,6 @@ def on_receive(self, umsg: UMessage) -> None: UStatusError.from_code_message(code=code, message=f"Communication error [{UCode.Name(code)}]") ) return - future.set_result(umsg) diff --git a/uprotocol/communication/inmemoryrpcserver.py b/uprotocol/communication/inmemoryrpcserver.py index 4e1c229..def933b 100644 --- a/uprotocol/communication/inmemoryrpcserver.py +++ b/uprotocol/communication/inmemoryrpcserver.py @@ -34,7 +34,7 @@ def __init__(self, transport: UTransport, request_handlers): self.transport = transport self.request_handlers = request_handlers - def on_receive(self, request: UMessage) -> None: + async def on_receive(self, request: UMessage) -> None: """ Generic incoming handler to process RPC requests from clients. @@ -61,8 +61,7 @@ def on_receive(self, request: UMessage) -> None: if isinstance(e, UStatusError): code = e.get_code() response_builder.with_commstatus(code) - - self.transport.send(response_builder.build_from_upayload(response_payload)) + await self.transport.send(response_builder.build_from_upayload(response_payload)) class InMemoryRpcServer(RpcServer): diff --git a/uprotocol/transport/ulistener.py b/uprotocol/transport/ulistener.py index cc188d8..d3c7584 100644 --- a/uprotocol/transport/ulistener.py +++ b/uprotocol/transport/ulistener.py @@ -18,13 +18,25 @@ class UListener(ABC): - """For any implementation that defines some kind of callback or function that will be called to handle incoming + """ + For any implementation that defines some kind of callback or function that will be called to handle incoming messages. """ @abstractmethod - def on_receive(self, umsg: UMessage) -> None: - """Method called to handle/process messages.

+ async def on_receive(self, umsg: UMessage) -> None: + """ + Method called to handle/process messages. + + `on_receive()` is expected to return almost immediately. If it does not, it could potentially + block further message receipt. For long-running operations, consider passing off received + data to a different async function to handle it and return. + + Note for `UTransport` implementers: + + Because `on_receive()` is an async function, you may choose to either `await` it in the current context + or spawn it onto a new task and await it there to allow the current context to continue immediately. + @param umsg: UMessage to be sent. """ pass From 929f0d81f9913082854cf0ff80406e9fb04688d9 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Thu, 18 Jul 2024 14:21:26 -0400 Subject: [PATCH 03/10] Update READMEs --- README.adoc | 28 +- uprotocol/client/README.adoc | 50 ++ uprotocol/client/README.html | 726 ++++++++++++++++++++++++++++ uprotocol/communication/README.adoc | 28 -- 4 files changed, 795 insertions(+), 37 deletions(-) create mode 100644 uprotocol/client/README.adoc create mode 100644 uprotocol/client/README.html diff --git a/README.adoc b/README.adoc index d9c8f31..db4c671 100644 --- a/README.adoc +++ b/README.adoc @@ -3,7 +3,9 @@ == Overview -This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Python defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <> below. Each package contains a README.adoc file that describes the purpose of the package and how to use it. +This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Python defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <> below and organized by the layers of the protocol. + +Each package contains a README.adoc file that describes the purpose of the package and how to use it. The module contains the factory methods, serializers, and validators for all data types defined in the specifications, and any data models that either haven't or couldn't be defined in uprotocol-core-api yet (ex. UPayload) This library fits into the big picture of the uProtocol SDK as seen in the diagram below. @@ -63,23 +65,31 @@ The Library is broken up into different packages that are described in <> for examples of how to use the different data types and their factories, validators, and serializers. diff --git a/uprotocol/client/README.adoc b/uprotocol/client/README.adoc new file mode 100644 index 0000000..15c106f --- /dev/null +++ b/uprotocol/client/README.adoc @@ -0,0 +1,50 @@ +# Application Layer APIs (uP-L3 Interface) + +The following module includes the client-facing https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l3[Application Layer (uP-L3)] interfaces to communication with USubscription, UDiscovery, and UTwin services. + + +## uP-L3 Interfaces + +.Interfaces (uP-L3 Interface) +[cols="1,1,3",options="header"] +|=== +| Interface | Implementation(s) | Description + +| xref:/v3/usubscriptionclient.py[*USubscriptionClient*] | xref:/v3/InMemoryUSubscriptionClient.py[InMemoryUSubscriptionClient] | Subscription Management APIs to subscribe(), unsubscribe() and fetch information from the subscription database. +|=== + + +The module includes the interface for the client-facing APIs as well as a simple in-memory implementation that is based on the uP-L2 in-memory implementations. the term in-memory is used to indicate that the data required by the code is cached inside of the object and not persisted to a given database backend, this design is useful for embedded applications (i.e. in the vehicle) however will not scale to the multi-tenanted cloud applications. + +## Examples + + +=== Subscribe and Unsubscribe +[,python] +---- +transport = # your UTransport instance + +#subscription topic +topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) + +#Listener to process incoming events on the topic +class MySubscriptionListener(UListener): + def on_receive(self, umsg: UMessage) -> None: + # Handle receiving published message + pass + +listener = MySubscriptionListener() + +# Optional handler that is called whenever the SubscriptionState changes for the subscriber +class MySubscriptionChangeHandler(SubscriptionChangeHandler) + def handle_subscription_change(self, topic: UUri, status: SubscriptionStatus) -> None: + # Handle subscription status changes if you're interested like when + # the subscription state changes from SUBSCRIBE_PENDING to SUBSCRIBED + pass + +subscriber : InMemoryUSubscriptionClient = InMemoryUSubscriptionClient(transport) +response : SubscriptionResponse = subscriber.subscribe(topic, listener, CallOptions.DEFAULT, MySubscriptionChangeHandler()) + +#UnSubscribe from the topic +status : UStatus = subscriber.unsubscribe(topic, listener) +---- \ No newline at end of file diff --git a/uprotocol/client/README.html b/uprotocol/client/README.html new file mode 100644 index 0000000..a5d428d --- /dev/null +++ b/uprotocol/client/README.html @@ -0,0 +1,726 @@ + + + + + + + +Application Layer APIs (uP-L3 Interface) + + + + + + +
+
+
+
+

The following module includes the client-facing Application Layer (uP-L3) interfaces to communication with USubscription, UDiscovery, and UTwin services.

+
+
+
+
+

uP-L3 Interfaces

+
+ + +++++ + + + + + + + + + + + + + + +
Table 1. Interfaces (uP-L3 Interface)
InterfaceImplementation(s)Description

USubscriptionClient

InMemoryUSubscriptionClient

Subscription Management APIs to subscribe(), unsubscribe() and fetch information from the subscription database.

+
+

The module includes the interface for the client-facing APIs as well as a simple in-memory implementation that is based on the uP-L2 in-memory implementations. the term in-memory is used to indicate that the data required by the code is cached inside of the object and not persisted to a given database backend, this design is useful for embedded applications (i.e. in the vehicle) however will not scale to the multi-tenanted cloud applications.

+
+
+
+
+ + + + \ No newline at end of file diff --git a/uprotocol/communication/README.adoc b/uprotocol/communication/README.adoc index a5c9e93..eb53274 100644 --- a/uprotocol/communication/README.adoc +++ b/uprotocol/communication/README.adoc @@ -17,7 +17,6 @@ The following folder contains implementations of the L2 Communication Layer APIs | Interface | Implementation(s) | Description | xref:publisher.py[*Publisher*] | xref:simplepublisher.py[SimplePublisher] | Producers API to send publish or notification messages -| xref:subscriber.py[*Subscriber*] | xref:inmemorysubscriber.py[InMemorySubscriber] | Consumers API to subscribe or unsubscribe to topics | xref:rpcclient.py[*RpcClient*] | xref:inmemoryrpcclient.py[InMemoryRpcClient] | Client interface to invoke a method | xref:rpcserver.py[*RpcServer*] | xref:inmemoryrpcserver.py[InMemoryRpcServer]| Server interface to register a listener for incoming RPC requests and automatically send a response | xref:notifier.py[*Notifier*] | xref:simplenotifier.py[SimpleNotifier] | Notification communication pattern APIs to notify and register a listener to receive the notifications @@ -43,33 +42,6 @@ publisher : Publisher = UClient(transport) publisher.publish(topic) ---- -=== Subscribe and Unsubscribe -[,python] ----- -transport = # your UTransport instance - -#subscription topic -topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) - -#Listener to process incoming events on the topic -class MySubscriptionListener(UListener): - def on_receive(self, umsg: UMessage) -> None: - # Handle receiving published message - pass -listener = MySubscriptionListener() -# Optional handler that is called whenever the SubscriptionState changes for the subscriber -class MySubscriptionChangeHandler(SubscriptionChangeHandler) - def handle_subscription_change(self, topic: UUri, status: SubscriptionStatus) -> None: - # Handle subscription status changes if you're interested like when - # the subscription state changes from SUBSCRIBE_PENDING to SUBSCRIBED - pass - -subscriber : Subscriber = UClient(transport) -subscriber.subscribe(topic, listener, CallOptions.DEFAULT, MySubscriptionChangeHandler()) - -#UnSubscribe from the topic -subscriber.unsubscribe(topic, listener) ----- === Invoke a method using RPCClient [,python] ---- From bd2645dbb5c8179c13695ec8cec489a5a80c6754 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Thu, 18 Jul 2024 14:27:08 -0400 Subject: [PATCH 04/10] Bump version to 0.2.0 for major release introducing L2 and L3. --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b360c5b..48ce3f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "up-python" -version = "0.1.3-dev" +version = "0.2.0-dev" description = "Language specific uProtocol library for building and using UUri, UUID, UAttributes, UTransport, and more." authors = ["Neelam Kushwah "] license = "The Apache License, Version 2.0" From bf583e070585959d889ba1550660a26bf1bb117c Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Thu, 18 Jul 2024 14:36:10 -0400 Subject: [PATCH 05/10] Delete client README.html --- uprotocol/client/README.html | 726 ----------------------------------- 1 file changed, 726 deletions(-) delete mode 100644 uprotocol/client/README.html diff --git a/uprotocol/client/README.html b/uprotocol/client/README.html deleted file mode 100644 index a5d428d..0000000 --- a/uprotocol/client/README.html +++ /dev/null @@ -1,726 +0,0 @@ - - - - - - - -Application Layer APIs (uP-L3 Interface) - - - - - - -
-
-
-
-

The following module includes the client-facing Application Layer (uP-L3) interfaces to communication with USubscription, UDiscovery, and UTwin services.

-
-
-
-
-

uP-L3 Interfaces

-
- - ----- - - - - - - - - - - - - - - -
Table 1. Interfaces (uP-L3 Interface)
InterfaceImplementation(s)Description

USubscriptionClient

InMemoryUSubscriptionClient

Subscription Management APIs to subscribe(), unsubscribe() and fetch information from the subscription database.

-
-

The module includes the interface for the client-facing APIs as well as a simple in-memory implementation that is based on the uP-L2 in-memory implementations. the term in-memory is used to indicate that the data required by the code is cached inside of the object and not persisted to a given database backend, this design is useful for embedded applications (i.e. in the vehicle) however will not scale to the multi-tenanted cloud applications.

-
-
-
-
- - - - \ No newline at end of file From a9a1b94c2a618047b941df6aa1f2c6969bf29cb5 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Fri, 19 Jul 2024 14:27:08 -0400 Subject: [PATCH 06/10] Incorporated Reviewers comment --- README.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.adoc b/README.adoc index db4c671..019d9b5 100644 --- a/README.adoc +++ b/README.adoc @@ -76,7 +76,7 @@ The Library is broken up into different packages that are described in < Date: Mon, 22 Jul 2024 12:56:13 -0400 Subject: [PATCH 07/10] Fixed review comments --- .../usubscription/v3/inmemoryusubcriptionclient.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py index 483ec75..a930a8d 100644 --- a/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py +++ b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py @@ -53,6 +53,13 @@ class MyNotificationListener(UListener): def __init__(self, handlers): + """ + Initializes a new instance of the MyNotificationListener class. + + :param handlers: A dictionary mapping topics to their respective handlers. + The handlers are responsible for processing subscription + change notifications for their corresponding topics. + """ self.handlers = handlers async def on_receive(self, message: UMessage) -> None: @@ -93,12 +100,12 @@ def __init__( of RpcClient and Notifier. :param transport: The transport to use for sending the notifications. - :param rpcClient: The RPC client to use for sending the RPC requests. + :param rpc_client: The RPC client to use for sending the RPC requests. :param notifier: The notifier to use for registering the notification listener. """ if not transport: raise ValueError(UTransport.TRANSPORT_NULL_ERROR) - elif not isinstance(transport, UTransport): + if not isinstance(transport, UTransport): raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) if not rpc_client: rpc_client = InMemoryRpcClient(transport) From c29da91c9f5c607493853b62dbbdf575ad6c08b1 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Tue, 23 Jul 2024 10:46:28 -0400 Subject: [PATCH 08/10] Fixed communication status issue --- tests/test_communication/mock_utransport.py | 10 ++++++++++ tests/test_communication/test_inmemoryrpcclient.py | 13 ++++++++++++- uprotocol/communication/inmemoryrpcclient.py | 2 +- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/tests/test_communication/mock_utransport.py b/tests/test_communication/mock_utransport.py index 8090d0e..527c8aa 100644 --- a/tests/test_communication/mock_utransport.py +++ b/tests/test_communication/mock_utransport.py @@ -130,6 +130,16 @@ def build_response(self, request): ) +class CommStatusUCodeOKTransport(MockUTransport): + def build_response(self, request): + status = UStatus(code=UCode.OK, message="No Communication Error") + return ( + UMessageBuilder.response_for_request(request.attributes) + .with_commstatus(status.code) + .build_from_upayload(UPayload.pack(status)) + ) + + class EchoUTransport(MockUTransport): def build_response(self, request): return request diff --git a/tests/test_communication/test_inmemoryrpcclient.py b/tests/test_communication/test_inmemoryrpcclient.py index 6703608..e10ba3c 100644 --- a/tests/test_communication/test_inmemoryrpcclient.py +++ b/tests/test_communication/test_inmemoryrpcclient.py @@ -14,7 +14,12 @@ import unittest -from tests.test_communication.mock_utransport import CommStatusTransport, MockUTransport, TimeoutUTransport +from tests.test_communication.mock_utransport import ( + CommStatusTransport, + CommStatusUCodeOKTransport, + MockUTransport, + TimeoutUTransport, +) from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.upayload import UPayload @@ -101,6 +106,12 @@ async def test_invoke_method_with_comm_status_transport(self): self.assertEqual(UCode.FAILED_PRECONDITION, context.exception.status.code) self.assertEqual("Communication error [FAILED_PRECONDITION]", context.exception.status.message) + async def test_invoke_method_with_comm_status_transport(self): + rpc_client = InMemoryRpcClient(CommStatusUCodeOKTransport()) + payload = UPayload.pack_to_any(UUri()) + response = await rpc_client.invoke_method(self.create_method_uri(), payload, None) + self.assertEqual(UCode.OK, UPayload.unpack(response, UStatus).code) + async def test_invoke_method_with_error_transport(self): class ErrorUTransport(MockUTransport): async def send(self, message): diff --git a/uprotocol/communication/inmemoryrpcclient.py b/uprotocol/communication/inmemoryrpcclient.py index 1f68c56..059e55d 100644 --- a/uprotocol/communication/inmemoryrpcclient.py +++ b/uprotocol/communication/inmemoryrpcclient.py @@ -50,7 +50,7 @@ async def on_receive(self, umsg: UMessage) -> None: if not future: return - if response_attributes.commstatus: + if response_attributes.commstatus and response_attributes.commstatus != UCode.OK: code = response_attributes.commstatus future.set_exception( UStatusError.from_code_message(code=code, message=f"Communication error [{UCode.Name(code)}]") From b4dac12047ef61b398c788ee5959c882395c1d5d Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Tue, 23 Jul 2024 15:27:46 -0400 Subject: [PATCH 09/10] Added subscriber info in unsubscribe request --- .../test_v3/test_inmemoryusubcriptionclient.py | 3 +++ .../client/usubscription/v3/inmemoryusubcriptionclient.py | 4 +++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py b/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py index 32fb7bf..b0410f8 100644 --- a/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py +++ b/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py @@ -272,6 +272,7 @@ async def test_subscribe_when_we_try_to_subscribe_to_the_same_topic_twice_with_d self.assertEqual(self.transport.get_source.call_count, 2) async def test_unsubscribe_using_mock_rpcclient_and_simplernotifier(self): + self.transport.get_source.return_value = self.source self.transport.register_listener.return_value = UStatus(code=UCode.OK) self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) self.rpc_client.invoke_method.return_value = UPayload.pack(UnsubscribeResponse()) @@ -289,6 +290,7 @@ async def test_unsubscribe_using_mock_rpcclient_and_simplernotifier(self): self.transport.unregister_listener.assert_called_once() async def test_unsubscribe_when_invokemethod_return_an_exception(self): + self.transport.get_source.return_value = self.source self.transport.register_listener.return_value = UStatus(code=UCode.OK) self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) self.rpc_client.invoke_method.return_value = UStatusError.from_code_message( @@ -306,6 +308,7 @@ async def test_unsubscribe_when_invokemethod_return_an_exception(self): self.transport.unregister_listener.assert_not_called() async def test_unsubscribe_when_invokemethod_returned_ok_but_we_failed_to_unregister_the_listener(self): + self.transport.get_source.return_value = self.source self.transport.register_listener.return_value = UStatus(code=UCode.OK) self.transport.unregister_listener.return_value = UStatusError.from_code_message(UCode.ABORTED, "aborted") self.rpc_client.invoke_method.return_value = UPayload.pack( diff --git a/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py index a930a8d..a5d1e0b 100644 --- a/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py +++ b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py @@ -218,7 +218,9 @@ async def unsubscribe( raise ValueError("Listener missing") if not options: raise ValueError("CallOptions missing") - unsubscribe_request = UnsubscribeRequest(topic=topic) + unsubscribe_request = UnsubscribeRequest( + topic=topic, subscriber=SubscriberInfo(uri=self.transport.get_source()) + ) future_result = self.rpc_client.invoke_method(self.unsubscribe_uri, UPayload.pack(unsubscribe_request), options) response = await RpcMapper.map_response_to_result(future_result, UnsubscribeResponse) From e25916464e921c1e85b6d39b7bbf1e16139f4e8c Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Wed, 24 Jul 2024 16:43:04 -0400 Subject: [PATCH 10/10] Address review comments --- README.adoc | 2 +- .../client/usubscription/v3/inmemoryusubcriptionclient.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.adoc b/README.adoc index 019d9b5..45a492c 100644 --- a/README.adoc +++ b/README.adoc @@ -80,7 +80,7 @@ The Library is broken up into different packages that are described in <