diff --git a/README.adoc b/README.adoc index d9c8f31..45a492c 100644 --- a/README.adoc +++ b/README.adoc @@ -3,7 +3,9 @@ == Overview -This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Python defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <> below. Each package contains a README.adoc file that describes the purpose of the package and how to use it. +This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Python defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <> below and organized by the layers of the protocol. + +Each package contains a README.adoc file that describes the purpose of the package and how to use it. The module contains the factory methods, serializers, and validators for all data types defined in the specifications, and any data models that either haven't or couldn't be defined in uprotocol-core-api yet (ex. UPayload) This library fits into the big picture of the uProtocol SDK as seen in the diagram below. @@ -63,23 +65,31 @@ The Library is broken up into different packages that are described in <> for examples of how to use the different data types and their factories, validators, and serializers. diff --git a/pyproject.toml b/pyproject.toml index b360c5b..48ce3f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "up-python" -version = "0.1.3-dev" +version = "0.2.0-dev" description = "Language specific uProtocol library for building and using UUri, UUID, UAttributes, UTransport, and more." authors = ["Neelam Kushwah "] license = "The Apache License, Version 2.0" diff --git a/tests/test_client/__init__.py b/tests/test_client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_client/test_usubscription/__init__.py b/tests/test_client/test_usubscription/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_client/test_usubscription/test_v3/__init__.py b/tests/test_client/test_usubscription/test_v3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_communication/test_inmemorysubscriber.py b/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py similarity index 51% rename from tests/test_communication/test_inmemorysubscriber.py rename to tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py index edbe1b4..b0410f8 100644 --- a/tests/test_communication/test_inmemorysubscriber.py +++ b/tests/test_client/test_usubscription/test_v3/test_inmemoryusubcriptionclient.py @@ -16,14 +16,18 @@ import unittest from unittest.mock import AsyncMock, MagicMock +from tests.test_communication.mock_utransport import MockUTransport +from uprotocol.client.usubscription.v3.inmemoryusubcriptionclient import InMemoryUSubscriptionClient +from uprotocol.client.usubscription.v3.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient -from uprotocol.communication.inmemorysubscriber import InMemorySubscriber from uprotocol.communication.simplenotifier import SimpleNotifier -from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.communication.upayload import UPayload from uprotocol.communication.ustatuserror import UStatusError from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + FetchSubscribersResponse, + FetchSubscriptionsRequest, + FetchSubscriptionsResponse, SubscriptionResponse, SubscriptionStatus, UnsubscribeResponse, @@ -39,11 +43,11 @@ class MyListener(UListener): - def on_receive(self, umsg: UMessage) -> None: + async def on_receive(self, umsg: UMessage) -> None: pass -class TestInMemorySubscriber(unittest.IsolatedAsyncioTestCase): +class TestInMemoryUSubscriptionClient(unittest.IsolatedAsyncioTestCase): def setUp(self): self.transport = MagicMock(spec=UTransport) self.rpc_client = MagicMock(spec=InMemoryRpcClient) @@ -66,7 +70,7 @@ async def test_simple_mock_of_rpc_client_and_notifier(self): self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) result = await subscriber.subscribe(self.topic, self.listener) @@ -89,10 +93,10 @@ async def test_simple_mock_of_rpc_client_and_notifier_returned_subscribe_pending self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) result = await subscriber.subscribe(self.topic, self.listener) @@ -115,10 +119,10 @@ async def test_simple_mock_of_rpc_client_and_notifier_returned_unsubscribed(self self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) result = await subscriber.subscribe(self.topic, self.listener) @@ -137,7 +141,7 @@ async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokeme self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) with self.assertRaises(Exception) as context: @@ -149,6 +153,19 @@ async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokeme self.transport.register_listener.assert_not_called() self.transport.get_source.assert_called_once() + async def test_subscribe_when_register_notification_listener_return_failed_status(self): + self.transport.get_source.return_value = self.source + + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.INTERNAL) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + with self.assertRaises(UStatusError) as context: + await subscriber.subscribe(self.topic, self.listener) + self.assertEqual(UCode.INTERNAL, context.exception.status.code) + self.assertEqual("Failed to register listener for rpc client", context.exception.status.message) + async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokemethod_return_an_ustatuserror(self): self.transport.get_source.return_value = self.source @@ -157,7 +174,7 @@ async def test_subscribe_using_mock_rpc_client_and_simplernotifier_when_invokeme self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) with self.assertRaises(UStatusError) as context: @@ -180,7 +197,7 @@ async def test_subscribe_when_we_pass_a_subscription_change_notification_handler self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) handler = MagicMock(spec=SubscriptionChangeHandler) @@ -206,7 +223,7 @@ async def test_subscribe_when_we_try_to_subscribe_to_the_same_topic_twice_with_s self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) handler = MagicMock(spec=SubscriptionChangeHandler) @@ -234,7 +251,7 @@ async def test_subscribe_when_we_try_to_subscribe_to_the_same_topic_twice_with_d self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) handler = MagicMock(spec=SubscriptionChangeHandler) @@ -255,13 +272,14 @@ async def test_subscribe_when_we_try_to_subscribe_to_the_same_topic_twice_with_d self.assertEqual(self.transport.get_source.call_count, 2) async def test_unsubscribe_using_mock_rpcclient_and_simplernotifier(self): + self.transport.get_source.return_value = self.source self.transport.register_listener.return_value = UStatus(code=UCode.OK) self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) self.rpc_client.invoke_method.return_value = UPayload.pack(UnsubscribeResponse()) self.notifier.unregister_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) response = await subscriber.unsubscribe(self.topic, self.listener) self.assertEqual(response.message, "") @@ -272,13 +290,14 @@ async def test_unsubscribe_using_mock_rpcclient_and_simplernotifier(self): self.transport.unregister_listener.assert_called_once() async def test_unsubscribe_when_invokemethod_return_an_exception(self): + self.transport.get_source.return_value = self.source self.transport.register_listener.return_value = UStatus(code=UCode.OK) self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) self.rpc_client.invoke_method.return_value = UStatusError.from_code_message( UCode.CANCELLED, "Operation cancelled" ) self.notifier.unregister_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) response = await subscriber.unsubscribe(self.topic, self.listener) self.assertEqual(response.message, "Operation cancelled") @@ -289,13 +308,14 @@ async def test_unsubscribe_when_invokemethod_return_an_exception(self): self.transport.unregister_listener.assert_not_called() async def test_unsubscribe_when_invokemethod_returned_ok_but_we_failed_to_unregister_the_listener(self): + self.transport.get_source.return_value = self.source self.transport.register_listener.return_value = UStatus(code=UCode.OK) self.transport.unregister_listener.return_value = UStatusError.from_code_message(UCode.ABORTED, "aborted") self.rpc_client.invoke_method.return_value = UPayload.pack( SubscriptionResponse(status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBE_PENDING)) ) self.notifier.unregister_notification_listener.return_value = UStatus(code=UCode.OK) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) response = await subscriber.unsubscribe(self.topic, self.listener) self.assertEqual(response.status.code, UCode.ABORTED) @@ -318,11 +338,11 @@ async def register_notification_listener(uri, listener): await barrier.wait() # Wait for the barrier again update = Update(topic=self.topic, status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBED)) message = UMessageBuilder.notification(self.topic, self.source).build_from_upayload(UPayload.pack(update)) - listener.on_receive(message) + await listener.on_receive(message) return UStatus(code=UCode.OK) self.notifier.register_notification_listener = AsyncMock(side_effect=register_notification_listener) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) self.assertIsNotNone(subscriber) result = await subscriber.subscribe(self.topic, self.listener, CallOptions.DEFAULT) self.assertEqual(result.status.state, SubscriptionStatus.State.SUBSCRIBE_PENDING) @@ -338,65 +358,327 @@ async def register_notification_listener(uri, listener): async def test_unregister_listener_missing_topic(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unregister_listener(None, self.listener) self.assertEqual(str(context.exception), "Unsubscribe topic missing") async def test_unregister_listener_missing_listener(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unregister_listener(self.topic, None) self.assertEqual(str(context.exception), "Request listener missing") + async def test_unregister_listener_happy_path(self): + class MyListener(UListener): + async def on_receive(self, umsg: UMessage) -> None: + pass + + listener = MyListener() + notifier = MagicMock(spec=SimpleNotifier) + self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) + # with self.assertRaises(ValueError) as context: + status = await subscriber.unregister_listener(self.topic, listener) + self.assertEqual(UCode.OK, status.code) + async def test_unsubscribe_missing_topic(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unsubscribe(None, self.listener, CallOptions()) self.assertEqual(str(context.exception), "Unsubscribe topic missing") async def test_unsubscribe_missing_listener(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.unsubscribe(self.topic, None, CallOptions()) self.assertEqual(str(context.exception), "Listener missing") + async def test_unsubscribe_missing_options(self): + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) + with self.assertRaises(ValueError) as context: + await subscriber.unsubscribe(self.topic, self.listener, None) + self.assertEqual(str(context.exception), "CallOptions missing") + async def test_subscribe_missing_topic(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.subscribe(None, self.listener, CallOptions()) self.assertEqual(str(context.exception), "Subscribe topic missing") + async def test_subscribe_missing_options(self): + notifier = MagicMock(spec=SimpleNotifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) + with self.assertRaises(ValueError) as context: + await subscriber.subscribe(self.topic, self.listener, None) + self.assertEqual(str(context.exception), "CallOptions missing") + async def test_subscribe_missing_listener(self): notifier = MagicMock(spec=SimpleNotifier) - subscriber = InMemorySubscriber(self.transport, self.rpc_client, notifier) + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, notifier) with self.assertRaises(ValueError) as context: await subscriber.subscribe(self.topic, None, CallOptions()) self.assertEqual(str(context.exception), "Request listener missing") - def test_subscriber_constructor_transport_none(self): + def test_subscription_client_constructor_transport_none(self): with self.assertRaises(ValueError) as context: - InMemorySubscriber(None, None, None) + InMemoryUSubscriptionClient(None, None, None) self.assertEqual(str(context.exception), UTransport.TRANSPORT_NULL_ERROR) - def test_subscriber_constructor_transport_not_instance(self): + def test_subscription_client_constructor_transport_not_instance(self): with self.assertRaises(ValueError) as context: - InMemorySubscriber("InvalidTransport", None, None) + InMemoryUSubscriptionClient("InvalidTransport", None, None) self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) - def test_subscriber_constructor_rpcclient_none(self): + def test_subscription_client_constructor_with_just_transport(self): + client = InMemoryUSubscriptionClient(self.transport) + self.assertTrue(client is not None) + + async def test_register_notification_api_when_passed_a_null_topic(self): + # Setup mocks + notifier = AsyncMock() + notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + # Initialize InMemoryUSubscriptionClient + transport = MagicMock(spec=UTransport) + subscriber = InMemoryUSubscriptionClient(transport) + assert subscriber is not None + + # Define the handler + class MySubscriptionChangeHandler(SubscriptionChangeHandler): + def handle_subscription_change(self, topic, status): + raise NotImplementedError("Unimplemented method 'handleSubscriptionChange'") + + handler = MySubscriptionChangeHandler() + + # Assert that passing a null topic raises a ValueError with self.assertRaises(ValueError) as context: - InMemorySubscriber(self.transport, None, None) - self.assertEqual(str(context.exception), "RpcClient missing") + await subscriber.register_for_notifications(None, handler) + self.assertEqual(str(context.exception), "Topic missing") + + # Verify the notifier interaction + notifier.register_notification_listener.assert_not_called() - def test_subscriber_constructor_notiifier_none(self): + async def test_register_notification_api_when_passed_a_null_handler(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + assert subscriber is not None + + # Assert that passing a null handler raises a ValueError with self.assertRaises(ValueError) as context: - InMemorySubscriber(self.transport, self.rpc_client, None) - self.assertEqual(str(context.exception), "Notifier missing") + await subscriber.register_for_notifications(MagicMock(spec=UUri), None) + self.assertEqual(str(context.exception), "Handler missing") + + # Verify the notifier interaction + self.notifier.register_notification_listener.assert_not_called() + + async def test_register_notification_api_when_passed_a_valid_topic_and_handler(self): + subscriber = InMemoryUSubscriptionClient(MockUTransport()) + assert subscriber is not None + + # Define the handler + class MySubscriptionChangeHandler(SubscriptionChangeHandler): + def handle_subscription_change(self, topic, status): + raise NotImplementedError("Unimplemented method 'handleSubscriptionChange'") + + handler = MySubscriptionChangeHandler() + + status = await subscriber.register_for_notifications(UUri(), handler) + self.assertTrue(status is not None) + + async def test_register_notification_api_when_invoke_method_throws_an_exception(self): + self.notifier.register_notification_listener.return_value = UStatus(code=UCode.OK) + + self.transport.get_source.return_value = self.source + + self.rpc_client.invoke_method.return_value = UStatusError.from_code_message( + code=UCode.PERMISSION_DENIED, message="Not permitted" + ) + + # Initialize the subscription client + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + # Define the subscription change handler + class MySubscriptionChangeHandler(SubscriptionChangeHandler): + async def handle_subscription_change(self, topic, status): + raise NotImplementedError("Unimplemented method 'handle_subscription_change'") + + handler = MySubscriptionChangeHandler() + with self.assertRaises(UStatusError) as context: + await subscriber.register_for_notifications(self.topic, handler) + + self.assertEqual(UCode.PERMISSION_DENIED, context.exception.status.code) + self.assertEqual("Not permitted", context.exception.status.message) + + async def test_register_for_notifications_to_the_same_topic_twice_with_same_notification_handler(self): + self.transport.get_source.return_value = self.source + + self.rpc_client.invoke_method.return_value = UPayload.pack(None) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + # First register_for_notifications attempt + result = await subscriber.register_for_notifications(self.topic, handler, CallOptions.DEFAULT) + self.assertTrue(result is not None) + + # Second register_for_notifications attempt + result = await subscriber.register_for_notifications(self.topic, handler, CallOptions.DEFAULT) + self.assertTrue(result is not None) + + self.assertEqual(self.rpc_client.invoke_method.call_count, 2) + self.assertEqual(self.transport.get_source.call_count, 2) + + async def test_register_for_notifications_to_the_same_topic_twice_with_different_notification_handler(self): + self.transport.get_source.return_value = self.source + + self.rpc_client.invoke_method.return_value = UPayload.pack(None) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + # First register_for_notifications attempt + result = await subscriber.register_for_notifications(self.topic, handler, CallOptions.DEFAULT) + self.assertTrue(result is not None) + handler1 = MagicMock(spec=SubscriptionChangeHandler) + handler1.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + # Second register_for_notifications attempt + with self.assertRaises(UStatusError) as context: + await subscriber.register_for_notifications(self.topic, handler1, CallOptions.DEFAULT) + self.assertEqual(UCode.ALREADY_EXISTS, context.exception.status.code) + self.assertEqual("Handler already registered", context.exception.status.message) + + self.assertEqual(self.rpc_client.invoke_method.call_count, 2) + self.assertEqual(self.transport.get_source.call_count, 2) + + async def test_unregister_notification_api_for_the_happy_path(self): + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + self.transport.get_source.return_value = self.source + self.rpc_client.invoke_method.return_value = UPayload.pack(None) + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + + try: + await subscriber.register_for_notifications(self.topic, handler) + await subscriber.unregister_for_notifications(self.topic, handler) + except Exception as e: + self.fail(f"Exception occurred: {e}") + + async def test_unregister_notification_api_topic_missing(self): + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + self.transport.get_source.return_value = self.source + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + with self.assertRaises(ValueError) as error: + await subscriber.unregister_for_notifications(None, handler) + self.assertEqual("Topic missing", str(error.exception)) + + async def test_unregister_notification_api_handler_missing(self): + self.transport.get_source.return_value = self.source + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + with self.assertRaises(ValueError) as error: + await subscriber.unregister_for_notifications(self.topic, None) + self.assertEqual("Handler missing", str(error.exception)) + + async def test_unregister_notification_api_options_none(self): + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + self.transport.get_source.return_value = self.source + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + with self.assertRaises(ValueError) as error: + await subscriber.unregister_for_notifications(self.topic, handler, None) + self.assertEqual("CallOptions missing", str(error.exception)) + + async def test_register_notification_api_options_none(self): + handler = MagicMock(spec=SubscriptionChangeHandler) + handler.handle_subscription_change.return_value = NotImplementedError( + "Unimplemented method 'handle_subscription_change'" + ) + self.transport.get_source.return_value = self.source + + subscriber = InMemoryUSubscriptionClient(self.transport, self.rpc_client, self.notifier) + self.assertIsNotNone(subscriber) + with self.assertRaises(ValueError) as error: + await subscriber.register_for_notifications(self.topic, handler, None) + self.assertEqual("CallOptions missing", str(error.exception)) + + async def test_fetch_subscribers_when_passing_null_topic(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + + with self.assertRaises(ValueError) as error: + await subscriber.fetch_subscribers(None) + self.assertEqual("Topic missing", str(error.exception)) + + async def test_fetch_subscribers_when_passing_null_calloptions(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + + with self.assertRaises(ValueError) as error: + await subscriber.fetch_subscribers(self.topic, None) + self.assertEqual("CallOptions missing", str(error.exception)) + + async def test_fetch_subscribers_passing_a_valid_topic(self): + subscriber = InMemoryUSubscriptionClient(MockUTransport()) + + try: + response = await subscriber.fetch_subscribers(self.topic) + self.assertEqual(response, FetchSubscribersResponse()) + except Exception as e: + self.fail(f"Exception occurred: {e}") + + async def test_fetch_subscriptions_when_passing_null_request(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + + with self.assertRaises(ValueError) as error: + await subscriber.fetch_subscriptions(None) + self.assertEqual("Request missing", str(error.exception)) + + async def test_fetch_subscriptions_when_passing_null_calloptions(self): + subscriber = InMemoryUSubscriptionClient(self.transport) + + with self.assertRaises(ValueError) as error: + await subscriber.fetch_subscriptions(FetchSubscriptionsRequest(), None) + self.assertEqual("CallOptions missing", str(error.exception)) + + async def test_fetch_subscriptions_passing_a_valid_fetch_subscription_request(self): + request = FetchSubscriptionsRequest(topic=self.topic) + subscriber = InMemoryUSubscriptionClient(MockUTransport()) + + try: + response = await subscriber.fetch_subscriptions(request) + self.assertEqual(response, FetchSubscriptionsResponse()) + except Exception as e: + self.fail(f"Exception occurred: {e}") if __name__ == '__main__': diff --git a/tests/test_communication/mock_utransport.py b/tests/test_communication/mock_utransport.py index 6e5b1fa..527c8aa 100644 --- a/tests/test_communication/mock_utransport.py +++ b/tests/test_communication/mock_utransport.py @@ -81,13 +81,13 @@ async def send(self, message: UMessage) -> UStatus: if message.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: response = self.build_response(message) - self._notify_listeners(response) + await self._notify_listeners(response) return UStatus(code=UCode.OK) - def _notify_listeners(self, response: UMessage): + async def _notify_listeners(self, response: UMessage): for listener in self.listeners: - listener.on_receive(response) + await listener.on_receive(response) async def register_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: self.listeners.append(listener) @@ -130,12 +130,22 @@ def build_response(self, request): ) +class CommStatusUCodeOKTransport(MockUTransport): + def build_response(self, request): + status = UStatus(code=UCode.OK, message="No Communication Error") + return ( + UMessageBuilder.response_for_request(request.attributes) + .with_commstatus(status.code) + .build_from_upayload(UPayload.pack(status)) + ) + + class EchoUTransport(MockUTransport): def build_response(self, request): return request async def send(self, message): response = self.build_response(message) - executor = ThreadPoolExecutor(max_workers=1) - executor.submit(self._notify_listeners, response) + + await self._notify_listeners(response) return UStatus(code=UCode.OK) diff --git a/tests/test_communication/test_inmemoryrpcclient.py b/tests/test_communication/test_inmemoryrpcclient.py index 6703608..e10ba3c 100644 --- a/tests/test_communication/test_inmemoryrpcclient.py +++ b/tests/test_communication/test_inmemoryrpcclient.py @@ -14,7 +14,12 @@ import unittest -from tests.test_communication.mock_utransport import CommStatusTransport, MockUTransport, TimeoutUTransport +from tests.test_communication.mock_utransport import ( + CommStatusTransport, + CommStatusUCodeOKTransport, + MockUTransport, + TimeoutUTransport, +) from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.upayload import UPayload @@ -101,6 +106,12 @@ async def test_invoke_method_with_comm_status_transport(self): self.assertEqual(UCode.FAILED_PRECONDITION, context.exception.status.code) self.assertEqual("Communication error [FAILED_PRECONDITION]", context.exception.status.message) + async def test_invoke_method_with_comm_status_transport(self): + rpc_client = InMemoryRpcClient(CommStatusUCodeOKTransport()) + payload = UPayload.pack_to_any(UUri()) + response = await rpc_client.invoke_method(self.create_method_uri(), payload, None) + self.assertEqual(UCode.OK, UPayload.unpack(response, UStatus).code) + async def test_invoke_method_with_error_transport(self): class ErrorUTransport(MockUTransport): async def send(self, message): diff --git a/tests/test_communication/test_inmemoryrpcserver.py b/tests/test_communication/test_inmemoryrpcserver.py index 4be86e7..c1f7f00 100644 --- a/tests/test_communication/test_inmemoryrpcserver.py +++ b/tests/test_communication/test_inmemoryrpcserver.py @@ -18,6 +18,8 @@ from unittest.mock import AsyncMock, MagicMock from tests.test_communication.mock_utransport import EchoUTransport +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer from uprotocol.communication.requesthandler import RequestHandler from uprotocol.communication.upayload import UPayload @@ -27,6 +29,7 @@ from uprotocol.transport.utransport import UTransport from uprotocol.uri.serializer.uriserializer import UriSerializer from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus @@ -136,7 +139,7 @@ async def custom_register_listener_behavior(source: UUri, listener: UListener, s async def custom_send_behavior(message): serialized_uri = UriSerializer().serialize(message.attributes.sink) if serialized_uri in listeners: - listeners[serialized_uri].on_receive(message) + await listeners[serialized_uri].on_receive(message) return UStatus(code=UCode.OK) self.mock_transport.send = AsyncMock(side_effect=custom_send_behavior) @@ -196,6 +199,22 @@ async def test_handle_requests_exception(self): status = await transport.send(request) self.assertEqual(status.code, UCode.OK) + async def test_end_to_end_rpc_with_test_transport(self): + class MyRequestHandler(RequestHandler): + def handle_request(self, message: UMessage) -> UPayload: + return UPayload.pack(UUri()) + + handler = MyRequestHandler() + test_transport = EchoUTransport() + server = InMemoryRpcServer(test_transport) + method = self.create_method_uri() + + self.assertEqual((await server.register_request_handler(method, handler)).code, UCode.OK) + rpc_client = InMemoryRpcClient(test_transport) + response = await rpc_client.invoke_method(method, None, CallOptions.DEFAULT) + self.assertIsNotNone(response) + self.assertEqual(response, UPayload.pack(UUri())) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_communication/test_simplenotifier.py b/tests/test_communication/test_simplenotifier.py index 12483cd..6fe630e 100644 --- a/tests/test_communication/test_simplenotifier.py +++ b/tests/test_communication/test_simplenotifier.py @@ -62,7 +62,7 @@ async def test_register_listener(self): self.transport.get_source.return_value = self.source class TestListener(UListener): - def on_receive(self, message: UMessage): + async def on_receive(self, message: UMessage): pass listener = TestListener() @@ -78,7 +78,7 @@ async def test_unregister_notification_listener(self): self.transport.unregister_listener.return_value = UStatus(code=UCode.OK) class TestListener(UListener): - def on_receive(self, message: UMessage): + async def on_receive(self, message: UMessage): pass listener = TestListener() @@ -97,7 +97,7 @@ async def test_unregister_listener_not_registered(self): self.transport.unregister_listener.return_value = UStatus(code=UCode.NOT_FOUND) class TestListener(UListener): - def on_receive(self, message: UMessage): + async def on_receive(self, message: UMessage): pass listener = TestListener() diff --git a/tests/test_communication/test_uclient.py b/tests/test_communication/test_uclient.py index d179e4f..6b15a3c 100644 --- a/tests/test_communication/test_uclient.py +++ b/tests/test_communication/test_uclient.py @@ -22,12 +22,6 @@ from uprotocol.communication.uclient import UClient from uprotocol.communication.upayload import UPayload from uprotocol.communication.ustatuserror import UStatusError -from uprotocol.core.usubscription.v3.usubscription_pb2 import ( - SubscriptionResponse, - SubscriptionStatus, - UnsubscribeResponse, -) -from uprotocol.transport.builder.umessagebuilder import UMessageBuilder from uprotocol.transport.ulistener import UListener from uprotocol.v1.ucode_pb2 import UCode from uprotocol.v1.umessage_pb2 import UMessage @@ -36,7 +30,7 @@ class MyListener(UListener): - def on_receive(self, umsg: UMessage) -> None: + async def on_receive(self, umsg: UMessage) -> None: # Handle receiving subscriptions here assert umsg is not None @@ -159,24 +153,6 @@ async def test_invoke_method_with_multi_invoke_transport(self): self.assertFalse(future_result1.exception()) self.assertFalse(future_result2.exception()) - async def test_subscribe_happy_path(self): - topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) - transport = HappySubscribeUTransport() - upclient = UClient(transport) - subscription_response = await upclient.subscribe(topic, self.listener, CallOptions(timeout=5000)) - # check for successfully subscribed - self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) - - async def test_unregister_listener(self): - topic = create_topic() - my_listener = create_autospec(UListener, instance=True) - - subscriber = UClient(HappySubscribeUTransport()) - subscription_response = await subscriber.subscribe(topic, my_listener, CallOptions.DEFAULT) - self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) - status = await subscriber.unregister_listener(topic, my_listener) - self.assertEqual(status.code, UCode.OK) - async def test_registering_request_listener(self): handler = create_autospec(RequestHandler, instance=True) server = UClient(MockUTransport()) @@ -209,7 +185,7 @@ async def test_happy_path_for_all_apis_async(self): client = UClient(MockUTransport()) class MyUListener(UListener): - def on_receive(self, umsg: UMessage) -> None: + async def on_receive(self, umsg: UMessage) -> None: pass class MyRequestHandler(RequestHandler): @@ -225,9 +201,6 @@ async def run_tests(): client.notify(create_topic(), create_destination_uri()), client.publish(create_topic()), client.invoke_method(create_method_uri(), UPayload.pack(None), CallOptions.DEFAULT), - client.subscribe(create_topic(), listener), - client.unsubscribe(create_topic(), listener), - unregister_listener_not_registered(client, listener), client.register_notification_listener(create_topic(), listener), client.unregister_notification_listener(create_topic(), listener), register_and_unregister_request_handler(client, handler), @@ -251,26 +224,6 @@ def create_method_uri(): return UUri(authority_name="neelam", ue_id=4, ue_version_major=1, resource_id=3) -class HappySubscribeUTransport(MockUTransport): - def build_response(self, request): - return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( - UPayload.pack( - SubscriptionResponse( - status=SubscriptionStatus( - state=SubscriptionStatus.State.SUBSCRIBED, message="Successfully Subscribed" - ) - ) - ) - ) - - -class HappyUnSubscribeUTransport(MockUTransport): - def build_response(self, request): - return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( - UPayload.pack(UnsubscribeResponse()) - ) - - class CommStatusTransport(MockUTransport): def build_response(self, request): status = UStatus(UCode.FAILED_PRECONDITION, "CommStatus Error") diff --git a/tests/test_transport/test_utransport.py b/tests/test_transport/test_utransport.py index 4615d34..a6276d1 100644 --- a/tests/test_transport/test_utransport.py +++ b/tests/test_transport/test_utransport.py @@ -23,8 +23,8 @@ class MyListener(UListener): - def on_receive(self, message): - super().on_receive(message) + async def on_receive(self, message): + await super().on_receive(message) pass @@ -36,7 +36,7 @@ async def send(self, message): return UStatus(code=UCode.INVALID_ARGUMENT if message is None else UCode.OK) async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: - listener.on_receive(UMessage()) + await listener.on_receive(UMessage()) return UStatus(code=UCode.OK) async def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): @@ -54,7 +54,7 @@ async def send(self, message): return UStatus(code=UCode.INTERNAL) async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: - listener.on_receive(None) + await listener.on_receive(None) return UStatus(code=UCode.INTERNAL) async def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): diff --git a/uprotocol/client/README.adoc b/uprotocol/client/README.adoc new file mode 100644 index 0000000..15c106f --- /dev/null +++ b/uprotocol/client/README.adoc @@ -0,0 +1,50 @@ +# Application Layer APIs (uP-L3 Interface) + +The following module includes the client-facing https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l3[Application Layer (uP-L3)] interfaces to communication with USubscription, UDiscovery, and UTwin services. + + +## uP-L3 Interfaces + +.Interfaces (uP-L3 Interface) +[cols="1,1,3",options="header"] +|=== +| Interface | Implementation(s) | Description + +| xref:/v3/usubscriptionclient.py[*USubscriptionClient*] | xref:/v3/InMemoryUSubscriptionClient.py[InMemoryUSubscriptionClient] | Subscription Management APIs to subscribe(), unsubscribe() and fetch information from the subscription database. +|=== + + +The module includes the interface for the client-facing APIs as well as a simple in-memory implementation that is based on the uP-L2 in-memory implementations. the term in-memory is used to indicate that the data required by the code is cached inside of the object and not persisted to a given database backend, this design is useful for embedded applications (i.e. in the vehicle) however will not scale to the multi-tenanted cloud applications. + +## Examples + + +=== Subscribe and Unsubscribe +[,python] +---- +transport = # your UTransport instance + +#subscription topic +topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) + +#Listener to process incoming events on the topic +class MySubscriptionListener(UListener): + def on_receive(self, umsg: UMessage) -> None: + # Handle receiving published message + pass + +listener = MySubscriptionListener() + +# Optional handler that is called whenever the SubscriptionState changes for the subscriber +class MySubscriptionChangeHandler(SubscriptionChangeHandler) + def handle_subscription_change(self, topic: UUri, status: SubscriptionStatus) -> None: + # Handle subscription status changes if you're interested like when + # the subscription state changes from SUBSCRIBE_PENDING to SUBSCRIBED + pass + +subscriber : InMemoryUSubscriptionClient = InMemoryUSubscriptionClient(transport) +response : SubscriptionResponse = subscriber.subscribe(topic, listener, CallOptions.DEFAULT, MySubscriptionChangeHandler()) + +#UnSubscribe from the topic +status : UStatus = subscriber.unsubscribe(topic, listener) +---- \ No newline at end of file diff --git a/uprotocol/client/__init__.py b/uprotocol/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uprotocol/client/usubscription/__init__.py b/uprotocol/client/usubscription/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uprotocol/client/usubscription/v3/__init__.py b/uprotocol/client/usubscription/v3/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uprotocol/communication/inmemorysubscriber.py b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py similarity index 53% rename from uprotocol/communication/inmemorysubscriber.py rename to uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py index 3533d74..0a11746 100644 --- a/uprotocol/communication/inmemorysubscriber.py +++ b/uprotocol/client/usubscription/v3/inmemoryusubcriptionclient.py @@ -14,16 +14,24 @@ from typing import Dict, Optional +from uprotocol.client.usubscription.v3.subscriptionchangehandler import SubscriptionChangeHandler +from uprotocol.client.usubscription.v3.usubscriptionclient import USubscriptionClient from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.notifier import Notifier from uprotocol.communication.rpcclient import RpcClient from uprotocol.communication.rpcmapper import RpcMapper -from uprotocol.communication.subscriber import Subscriber -from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler +from uprotocol.communication.simplenotifier import SimpleNotifier from uprotocol.communication.upayload import UPayload from uprotocol.communication.ustatuserror import UStatusError from uprotocol.core.usubscription.v3 import usubscription_pb2 from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + FetchSubscribersRequest, + FetchSubscribersResponse, + FetchSubscriptionsRequest, + FetchSubscriptionsResponse, + NotificationsRequest, + NotificationsResponse, SubscriberInfo, SubscriptionRequest, SubscriptionResponse, @@ -45,9 +53,16 @@ class MyNotificationListener(UListener): def __init__(self, handlers): + """ + Initializes a new instance of the MyNotificationListener class. + + :param handlers: A dictionary mapping topics to their respective handlers. + The handlers are responsible for processing subscription + change notifications for their corresponding topics. + """ self.handlers = handlers - def on_receive(self, message: UMessage) -> None: + async def on_receive(self, message: UMessage) -> None: """ Handles incoming notifications from the USubscription service. @@ -70,32 +85,32 @@ def on_receive(self, message: UMessage) -> None: pass -class InMemorySubscriber(Subscriber): +class InMemoryUSubscriptionClient(USubscriptionClient): """ - The following is an in-memory implementation of the Subscriber interface that - wraps the UTransport for implementing the Subscriber-side of the pub/sub - messaging pattern to allow developers to subscribe and unsubscribe to topics. - This implementation uses the InMemoryRpcClient and SimpleNotifier interfaces - to invoke the subscription request message to the usubscription service, and register - to receive notifications for changes from the uSubscription service. + Implementation of USubscriptionClient that caches state information within the object + and used for single tenant applications (ex. in-vehicle). The implementation uses InMemoryRpcClient + that also stores RPC correlation information within the objects """ - def __init__(self, transport: UTransport, rpc_client: RpcClient, notifier: Notifier): + def __init__( + self, transport: UTransport, rpc_client: Optional[RpcClient] = None, notifier: Optional[Notifier] = None + ): """ - Creates a new subscriber for existing Communication Layer client implementations. + Creates a new USubscription client passing UTransport, CallOptions, and an implementation + of RpcClient and Notifier. - :param transport: The transport to use for sending the notifications - :param rpc_client: The RPC client to use for sending the RPC requests - :param notifier: The notifier to register notification listeners + :param transport: The transport to use for sending the notifications. + :param rpc_client: The RPC client to use for sending the RPC requests. + :param notifier: The notifier to use for registering the notification listener. """ if not transport: raise ValueError(UTransport.TRANSPORT_NULL_ERROR) - elif not isinstance(transport, UTransport): + if not isinstance(transport, UTransport): raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) - elif not rpc_client: - raise ValueError("RpcClient missing") - elif not notifier: - raise ValueError("Notifier missing") + if not rpc_client: + rpc_client = InMemoryRpcClient(transport) + if not notifier: + notifier = SimpleNotifier(transport) self.transport = transport self.rpc_client = rpc_client self.notifier = notifier @@ -106,12 +121,16 @@ def __init__(self, transport: UTransport, rpc_client: RpcClient, notifier: Notif self.notification_uri = UriFactory.from_proto(service_descriptor, 0x8000) self.subscribe_uri = UriFactory.from_proto(service_descriptor, 1) self.unsubscribe_uri = UriFactory.from_proto(service_descriptor, 2) + self.fetch_subscribers_uri = UriFactory.from_proto(service_descriptor, 8) + self.fetch_subscriptions_uri = UriFactory.from_proto(service_descriptor, 3) + self.register_for_notification_uri = UriFactory.from_proto(service_descriptor, 6) + self.unregister_for_notification_uri = UriFactory.from_proto(service_descriptor, 7) async def subscribe( self, topic: UUri, listener: UListener, - options: CallOptions = None, + options: CallOptions = CallOptions.DEFAULT, handler: Optional[SubscriptionChangeHandler] = None, ) -> SubscriptionResponse: """ @@ -128,8 +147,8 @@ async def subscribe( returned. :param topic: The topic to subscribe to. - :param listener: The listener function to be called when a message is received on the topic. - :param options: Optional call options for the subscription. + :param listener: The listener function to be called when messages are received. + :param options: Optional CallOptions used to communicate with USubscription service. :param handler: Optional handler function for handling subscription state changes. :return: An async operation that yields a SubscriptionResponse upon success or raises an exception with the failure reason as UStatus. UCode.ALREADY_EXISTS will be returned if called multiple times @@ -139,6 +158,8 @@ async def subscribe( raise ValueError("Subscribe topic missing") if not listener: raise ValueError("Request listener missing") + if not options: + raise ValueError("CallOptions missing") if not self.is_listener_registered: # Ensure listener is registered before proceeding @@ -175,7 +196,9 @@ async def subscribe( self.handlers[topic_str] = handler return response - async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions = None) -> UStatus: + async def unsubscribe( + self, topic: UUri, listener: UListener, options: CallOptions = CallOptions.DEFAULT + ) -> UStatus: """ Unsubscribes from a given topic. @@ -186,14 +209,18 @@ async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptio :param topic: The topic to unsubscribe from. :param listener: The listener function associated with the topic. - :param options: Optional call options for the subscription. + :param options: Optional CallOptions used to communication with USubscription service. :return: An async operation that yields a UStatus indicating the result of the unsubscribe request. """ if not topic: raise ValueError("Unsubscribe topic missing") if not listener: raise ValueError("Listener missing") - unsubscribe_request = UnsubscribeRequest(topic=topic) + if not options: + raise ValueError("CallOptions missing") + unsubscribe_request = UnsubscribeRequest( + topic=topic, subscriber=SubscriberInfo(uri=self.transport.get_source()) + ) future_result = self.rpc_client.invoke_method(self.unsubscribe_uri, UPayload.pack(unsubscribe_request), options) response = await RpcMapper.map_response_to_result(future_result, UnsubscribeResponse) @@ -218,7 +245,9 @@ async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus raise ValueError("Unsubscribe topic missing") if not listener: raise ValueError("Request listener missing") - return await self.transport.unregister_listener(topic, listener) + status = await self.transport.unregister_listener(topic, listener) + self.handlers.pop(UriSerializer.serialize(topic), None) + return status def close(self): """ @@ -226,3 +255,109 @@ def close(self): """ self.handlers.clear() self.notifier.unregister_notification_listener(self.notification_uri, self.notification_handler) + + async def register_for_notifications( + self, topic: UUri, handler: SubscriptionChangeHandler, options: Optional[CallOptions] = CallOptions.DEFAULT + ): + """ + Register for Subscription Change Notifications. + + This API allows producers to register to receive subscription change notifications for + topics that they produce only. + + :param topic: UUri, The topic to register for notifications. + :param handler: callable, The SubscriptionChangeHandler to handle the subscription changes. + :param options: CallOptions, The CallOptions to be used for the register request. + + :return: asyncio.Future[NotificationsResponse], A future object that completes with NotificationsResponse + if the uSubscription service accepts the request to register the caller to be notified of subscription + changes, or raises an exception if there is a failure reason. + """ + if not topic: + raise ValueError("Topic missing") + if not handler: + raise ValueError("Handler missing") + if not options: + raise ValueError("CallOptions missing") + + request = NotificationsRequest(topic=topic, subscriber=SubscriberInfo(uri=self.transport.get_source())) + + response = self.rpc_client.invoke_method(self.register_for_notification_uri, UPayload.pack(request), options) + notifications_response = await RpcMapper.map_response(response, NotificationsResponse) + if handler: + topic_str = UriSerializer.serialize(topic) + if topic_str in self.handlers and self.handlers[topic_str] != handler: + raise UStatusError.from_code_message(UCode.ALREADY_EXISTS, "Handler already registered") + self.handlers[topic_str] = handler + + return notifications_response + + async def unregister_for_notifications( + self, topic: UUri, handler: SubscriptionChangeHandler, options: Optional[CallOptions] = CallOptions.DEFAULT + ): + """ + Unregister for subscription change notifications. + + :param topic: The topic to unregister for notifications. + :param handler: The `SubscriptionChangeHandler` to handle the subscription changes. + :param options: The `CallOptions` to be used for the unregister request. + :return: A `NotificationResponse` with the status of the API call to the uSubscription service, + or a `UStatus` with the reason for the failure. `UCode.PERMISSION_DENIED` is returned if the + topic `ue_id` does not equal the caller's `ue_id`. + """ + if not topic: + raise ValueError("Topic missing") + if not handler: + raise ValueError("Handler missing") + if not options: + raise ValueError("CallOptions missing") + + request = NotificationsRequest(topic=topic, subscriber=SubscriberInfo(uri=self.transport.get_source())) + + response = self.rpc_client.invoke_method(self.unregister_for_notification_uri, UPayload.pack(request), options) + notifications_response = await RpcMapper.map_response(response, NotificationsResponse) + + self.handlers.pop(UriSerializer.serialize(topic), None) + + return notifications_response + + async def fetch_subscribers(self, topic: UUri, options: Optional[CallOptions] = CallOptions.DEFAULT): + """ + Fetch the list of subscribers for a given produced topic. + + :param topic: The topic to fetch the subscribers for. + :param options: The `CallOptions` to be used for the fetch request. + :return: A `FetchSubscribersResponse` that contains the list of subscribers, + or a `UStatus` with the reason for the failure. + """ + if topic is None: + raise ValueError("Topic missing") + if options is None: + raise ValueError("CallOptions missing") + + request = FetchSubscribersRequest(topic=topic) + result = self.rpc_client.invoke_method(self.fetch_subscribers_uri, UPayload.pack(request), options) + return await RpcMapper.map_response(result, FetchSubscribersResponse) + + async def fetch_subscriptions( + self, request: FetchSubscriptionsRequest, options: Optional[CallOptions] = CallOptions.DEFAULT + ): + """ + Fetch the list of subscriptions for a given topic. + + This API provides more information than `fetch_subscribers()` as it also returns + `SubscribeAttributes` per subscriber, which might be useful for the producer to know. + + :param request: The request to fetch subscriptions for. + :param options: The `CallOptions` to be used for the request. + :return: A `FetchSubscriptionsResponse` that contains the subscription information per subscriber to the topic. + If unsuccessful, returns a `UStatus` with the reason for the failure. + `UCode.PERMISSION_DENIED` is returned if the topic `ue_id` does not equal the caller's `ue_id`. + """ + if request is None: + raise ValueError("Request missing") + if options is None: + raise ValueError("CallOptions missing") + + result = self.rpc_client.invoke_method(self.fetch_subscriptions_uri, UPayload.pack(request), options) + return await RpcMapper.map_response(result, FetchSubscriptionsResponse) diff --git a/uprotocol/communication/subscriptionchangehandler.py b/uprotocol/client/usubscription/v3/subscriptionchangehandler.py similarity index 100% rename from uprotocol/communication/subscriptionchangehandler.py rename to uprotocol/client/usubscription/v3/subscriptionchangehandler.py diff --git a/uprotocol/client/usubscription/v3/usubscriptionclient.py b/uprotocol/client/usubscription/v3/usubscriptionclient.py new file mode 100644 index 0000000..c7ae64d --- /dev/null +++ b/uprotocol/client/usubscription/v3/usubscriptionclient.py @@ -0,0 +1,177 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod +from typing import Optional + +from uprotocol.client.usubscription.v3.subscriptionchangehandler import SubscriptionChangeHandler +from uprotocol.communication.calloptions import CallOptions +from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + FetchSubscribersResponse, + FetchSubscriptionsRequest, + FetchSubscriptionsResponse, + NotificationsResponse, + SubscriptionResponse, +) +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class USubscriptionClient(ABC): + """ + The client-side interface for communicating with the USubscription service asynchronously. + + Provides methods for subscribing, unsubscribing, registering listeners, fetching subscribers, + fetching subscriptions, and handling subscription change notifications. + """ + + @abstractmethod + async def subscribe( + self, + topic: UUri, + listener: UListener, + options: Optional[CallOptions] = CallOptions.DEFAULT, + handler: Optional[SubscriptionChangeHandler] = None, + ) -> SubscriptionResponse: + """ + Subscribes to a given topic asynchronously. + + The API will return a SubscriptionResponse or raise an exception if the subscription + was not successful. The optional passed SubscriptionChangeHandler is used to receive + notifications of changes to the subscription status like a transition from + SubscriptionStatus.State.SUBSCRIBE_PENDING to SubscriptionStatus.State.SUBSCRIBED that + occurs when we subscribe to remote topics that the device we are on has not yet a + subscriber that has subscribed to said topic. + + :param topic: The topic to subscribe to. + :param listener: The listener to be called when messages are received. + :param options: The CallOptions to be used for the subscription. + :param handler: SubscriptionChangeHandler to handle changes to subscription states. + + :return: Returns a SubscriptionResponse or raises an exception with the failure reason + as UStatus. UCode.ALREADY_EXISTS will be raised if you call this API multiple times + passing a different handler. + """ + pass + + @abstractmethod + async def unsubscribe( + self, topic: UUri, listener: UListener, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> UStatus: + """ + Unsubscribes from a given topic asynchronously. + + The subscriber no longer wishes to be subscribed to said topic so we issue an unsubscribe + request to the USubscription service. The API will return a UStatus with the result of + the unsubscribe request. If we are unable to unsubscribe to the topic with the USubscription + service, the listener and handler (if any) will remain registered. + + :param topic: The topic to unsubscribe from. + :param listener: The listener to be called when a message is received on the topic. + :param options: The CallOptions to be used for the unsubscribe request. + + :return: Returns a UStatus with the result from the unsubscribe request. + """ + pass + + @abstractmethod + async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Unregister a listener and remove any registered SubscriptionChangeHandler for the topic. + + This method is used to remove handlers/listeners without notifying the uSubscription service + so that we can be persistently subscribed even when the uE is not running. + + :param topic: The topic to unsubscribe from. + :param listener: The listener to be called when a message is received on the topic. + + :return: Returns a UStatus with the status of the listener unregister request. + """ + pass + + @abstractmethod + async def register_for_notifications( + self, topic: UUri, handler: SubscriptionChangeHandler, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> NotificationsResponse: + """ + Register for Subscription Change Notifications. + + This API allows producers to register to receive subscription change notifications for + topics that they produce only. + + NOTE: Subscribers are automatically registered to receive notifications when they call + `subscribe()` API passing a SubscriptionChangeHandler so they do not need to + call this API. + + :param topic: The topic to register for notifications. + :param handler: The SubscriptionChangeHandler to handle the subscription changes. + :param options: The CallOptions to be used for the request. Default is CallOptions.DEFAULT. + + :return: Returns NotificationsResponse completed successfully if uSubscription service accepts the + request to register the caller to be notified of subscription changes, or + returns UStatus that indicates the failure reason. + """ + pass + + @abstractmethod + async def unregister_for_notifications( + self, topic: UUri, handler: SubscriptionChangeHandler, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> NotificationsResponse: + """ + Unregister for subscription change notifications. + + :param topic: The topic to unregister for notifications. + :param handler: The SubscriptionChangeHandler to be unregistered. + :param options: The CallOptions to be used for the request. Default is CallOptions.DEFAULT. + + :return: Returns NotificationsResponse completed successfully with the status of the API call to + uSubscription service, or completed unsuccessfully with UStatus with the reason for the failure. + """ + pass + + @abstractmethod + async def fetch_subscribers( + self, topic: UUri, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> FetchSubscribersResponse: + """ + Fetch the list of subscribers for a given produced topic. + + :param topic: The topic to fetch the subscribers for. + :param options: The CallOptions to be used for the request. + + :return: Returns FetchSubscribersResponse completed successfully with the list of subscribers, + or completed unsuccessfully with UStatus with the reason for the failure. + """ + pass + + @abstractmethod + async def fetch_subscriptions( + self, request: FetchSubscriptionsRequest, options: Optional[CallOptions] = CallOptions.DEFAULT + ) -> FetchSubscriptionsResponse: + """ + Fetch list of Subscriptions for a given topic. + + API provides more information than fetchSubscribers() in that it also returns + SubscribeAttributes per subscriber that might be useful to the producer to know. + + :param request: The request to fetch subscriptions for. + :param options: The CallOptions to be used for the request. + + :return: Returns FetchSubscriptionsResponse completed successfully with the subscription + information per subscriber to the topic, or completed unsuccessfully with UStatus + with the reason for the failure. UCode.PERMISSION_DENIED is returned if the topic + ue_id does not equal the caller's ue_id. + """ + pass diff --git a/uprotocol/communication/README.adoc b/uprotocol/communication/README.adoc index a5c9e93..eb53274 100644 --- a/uprotocol/communication/README.adoc +++ b/uprotocol/communication/README.adoc @@ -17,7 +17,6 @@ The following folder contains implementations of the L2 Communication Layer APIs | Interface | Implementation(s) | Description | xref:publisher.py[*Publisher*] | xref:simplepublisher.py[SimplePublisher] | Producers API to send publish or notification messages -| xref:subscriber.py[*Subscriber*] | xref:inmemorysubscriber.py[InMemorySubscriber] | Consumers API to subscribe or unsubscribe to topics | xref:rpcclient.py[*RpcClient*] | xref:inmemoryrpcclient.py[InMemoryRpcClient] | Client interface to invoke a method | xref:rpcserver.py[*RpcServer*] | xref:inmemoryrpcserver.py[InMemoryRpcServer]| Server interface to register a listener for incoming RPC requests and automatically send a response | xref:notifier.py[*Notifier*] | xref:simplenotifier.py[SimpleNotifier] | Notification communication pattern APIs to notify and register a listener to receive the notifications @@ -43,33 +42,6 @@ publisher : Publisher = UClient(transport) publisher.publish(topic) ---- -=== Subscribe and Unsubscribe -[,python] ----- -transport = # your UTransport instance - -#subscription topic -topic : UUri = UUri( ue_id=4, ue_version_major=1, resource_id=0x8000) - -#Listener to process incoming events on the topic -class MySubscriptionListener(UListener): - def on_receive(self, umsg: UMessage) -> None: - # Handle receiving published message - pass -listener = MySubscriptionListener() -# Optional handler that is called whenever the SubscriptionState changes for the subscriber -class MySubscriptionChangeHandler(SubscriptionChangeHandler) - def handle_subscription_change(self, topic: UUri, status: SubscriptionStatus) -> None: - # Handle subscription status changes if you're interested like when - # the subscription state changes from SUBSCRIBE_PENDING to SUBSCRIBED - pass - -subscriber : Subscriber = UClient(transport) -subscriber.subscribe(topic, listener, CallOptions.DEFAULT, MySubscriptionChangeHandler()) - -#UnSubscribe from the topic -subscriber.unsubscribe(topic, listener) ----- === Invoke a method using RPCClient [,python] ---- diff --git a/uprotocol/communication/inmemoryrpcclient.py b/uprotocol/communication/inmemoryrpcclient.py index b7720b8..059e55d 100644 --- a/uprotocol/communication/inmemoryrpcclient.py +++ b/uprotocol/communication/inmemoryrpcclient.py @@ -34,7 +34,7 @@ class HandleResponsesListener(UListener): def __init__(self, requests): self.requests = requests - def on_receive(self, umsg: UMessage) -> None: + async def on_receive(self, umsg: UMessage) -> None: """ Handle the responses coming back from the server asynchronously. @@ -50,13 +50,12 @@ def on_receive(self, umsg: UMessage) -> None: if not future: return - if response_attributes.commstatus: + if response_attributes.commstatus and response_attributes.commstatus != UCode.OK: code = response_attributes.commstatus future.set_exception( UStatusError.from_code_message(code=code, message=f"Communication error [{UCode.Name(code)}]") ) return - future.set_result(umsg) diff --git a/uprotocol/communication/inmemoryrpcserver.py b/uprotocol/communication/inmemoryrpcserver.py index 4e1c229..def933b 100644 --- a/uprotocol/communication/inmemoryrpcserver.py +++ b/uprotocol/communication/inmemoryrpcserver.py @@ -34,7 +34,7 @@ def __init__(self, transport: UTransport, request_handlers): self.transport = transport self.request_handlers = request_handlers - def on_receive(self, request: UMessage) -> None: + async def on_receive(self, request: UMessage) -> None: """ Generic incoming handler to process RPC requests from clients. @@ -61,8 +61,7 @@ def on_receive(self, request: UMessage) -> None: if isinstance(e, UStatusError): code = e.get_code() response_builder.with_commstatus(code) - - self.transport.send(response_builder.build_from_upayload(response_payload)) + await self.transport.send(response_builder.build_from_upayload(response_payload)) class InMemoryRpcServer(RpcServer): diff --git a/uprotocol/communication/subscriber.py b/uprotocol/communication/subscriber.py deleted file mode 100644 index e1e19f4..0000000 --- a/uprotocol/communication/subscriber.py +++ /dev/null @@ -1,89 +0,0 @@ -""" -SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation - -See the NOTICE file(s) distributed with this work for additional -information regarding copyright ownership. - -This program and the accompanying materials are made available under the -terms of the Apache License Version 2.0 which is available at - - http://www.apache.org/licenses/LICENSE-2.0 - -SPDX-License-Identifier: Apache-2.0 -""" - -from abc import ABC, abstractmethod -from typing import Optional - -from uprotocol.communication.calloptions import CallOptions -from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler -from uprotocol.core.usubscription.v3.usubscription_pb2 import ( - SubscriptionResponse, -) -from uprotocol.transport.ulistener import UListener -from uprotocol.v1.uri_pb2 import UUri -from uprotocol.v1.ustatus_pb2 import UStatus - - -class Subscriber(ABC): - """ - Communication Layer (uP-L2) Subscriber interface. - - This interface provides APIs to subscribe and unsubscribe to a given topic. - """ - - @abstractmethod - async def subscribe( - self, - topic: UUri, - listener: UListener, - options: Optional[CallOptions] = None, - handler: Optional[SubscriptionChangeHandler] = None, - ) -> SubscriptionResponse: - """ - Subscribes to a given topic asynchronously. - - The API will return a SubscriptionResponse or raise an exception if the subscription fails. - It registers the listener to be called when messages are received and allows the caller to register - a SubscriptionChangeHandler that is called whenever the subscription state changes - (e.g., SubscriptionStatus.State.PENDING_SUBSCRIBED to SubscriptionStatus.State.SUBSCRIBED, - SubscriptionStatus.State.SUBSCRIBED to SubscriptionStatus.State.UNSUBSCRIBED, etc.). - - :param topic: The topic to subscribe to. - :param listener: The UListener that is called when published messages are received. - :param options: The CallOptions to provide additional information (timeout, token, etc.). - :param handler: SubscriptionChangeHandler to handle changes to subscription states. - :return: Returns the SubscriptionResponse or raises an exception with the failure reason as UStatus. - """ - pass - - @abstractmethod - async def unsubscribe( - self, topic: UUri, listener: UListener, options: Optional[CallOptions] = CallOptions.DEFAULT - ) -> UStatus: - """ - Unsubscribes from a given topic. - - The subscriber no longer wishes to be subscribed to the specified topic, trigger an unsubscribe - request to the USubscription service. - - :param topic: The topic to unsubscribe to. - :param listener: The listener to be called when a message is received on the topic. - :param options: The call options for the subscription. - :return: Returns UStatus with the result from the unsubscribe request. - """ - pass - - @abstractmethod - async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: - """ - Unregisters a listener from a topic asynchronously. - - This method will only unregister the listener for a given subscription, allowing the uE to remain subscribed - even if the listener is removed. - - :param topic: The topic to subscribe to. - :param listener: The listener to be called when a message is received on the topic. - :return: Returns UStatus with the status of the listener unregister request. - """ - pass diff --git a/uprotocol/communication/uclient.py b/uprotocol/communication/uclient.py index 7bbc7d3..db91839 100644 --- a/uprotocol/communication/uclient.py +++ b/uprotocol/communication/uclient.py @@ -17,26 +17,20 @@ from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer -from uprotocol.communication.inmemorysubscriber import InMemorySubscriber from uprotocol.communication.notifier import Notifier from uprotocol.communication.publisher import Publisher from uprotocol.communication.rpcclient import RpcClient from uprotocol.communication.rpcserver import RpcServer from uprotocol.communication.simplenotifier import SimpleNotifier from uprotocol.communication.simplepublisher import SimplePublisher -from uprotocol.communication.subscriber import Subscriber -from uprotocol.communication.subscriptionchangehandler import SubscriptionChangeHandler from uprotocol.communication.upayload import UPayload -from uprotocol.core.usubscription.v3.usubscription_pb2 import ( - SubscriptionResponse, -) from uprotocol.transport.ulistener import UListener from uprotocol.transport.utransport import UTransport from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus -class UClient(RpcServer, Subscriber, Notifier, Publisher, RpcClient): +class UClient(RpcServer, Notifier, Publisher, RpcClient): """ UClient provides a unified interface for various communication patterns over a UTransport instance, including RPC, subscriptions, notifications, and message publishing. It combines functionalities @@ -64,59 +58,6 @@ def __init__(self, transport: UTransport): self.publisher = SimplePublisher(self.transport) self.notifier = SimpleNotifier(self.transport) self.rpc_client = InMemoryRpcClient(self.transport) - self.subscriber = InMemorySubscriber(self.transport, self.rpc_client, self.notifier) - - async def subscribe( - self, - topic: UUri, - listener: UListener, - options: Optional[CallOptions] = None, - handler: Optional[SubscriptionChangeHandler] = None, - ) -> SubscriptionResponse: - """ - Subscribe to a given topic asynchronously. - - The API will return a SubscriptionResponse or raise an exception if the subscription fails. - It registers the listener to be called when messages are received and allows the caller to register - a SubscriptionChangeHandler that is called whenever the subscription state changes (e.g., PENDING_SUBSCRIBED to - SUBSCRIBED, SUBSCRIBED to UNSUBSCRIBED, etc.). - - :param topic: The topic to subscribe to. - :param listener: The UListener that is called when published messages are received. - :param options: The CallOptions to provide additional information (timeout, token, etc.). - :param handler: SubscriptionChangeHandler to handle changes to subscription states. - :return: Returns the SubscriptionResponse or raises an exception with the failure reason as UStatus. - """ - return await self.subscriber.subscribe(topic, listener, options, handler) - - async def unsubscribe( - self, topic: UUri, listener: UListener, options: Optional[CallOptions] = CallOptions.DEFAULT - ) -> UStatus: - """ - Unsubscribe to a given topic asynchronously. - - The subscriber no longer wishes to be subscribed to the specified topic, trigger an unsubscribe - request to the USubscription service. - - :param topic: The topic to unsubscribe to. - :param listener: The listener to be called when a message is received on the topic. - :param options: The call options for the subscription. - :return: Returns UStatus with the result from the unsubscribe request. - """ - return await self.subscriber.unsubscribe(topic, listener, options) - - async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: - """ - Unregister a listener from a topic. - - This method will only unregister the listener for a given subscription thus allowing a uE to stay - subscribed even if the listener is removed. - - :param topic: The topic to subscribe to. - :param listener: The listener to be called when a message is received on the topic. - :return: Returns UStatus with the status of the listener unregister request. - """ - return await self.subscriber.unregister_listener(topic, listener) async def notify( self, topic: UUri, destination: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None @@ -207,5 +148,3 @@ async def invoke_method( def close(self): if self.rpc_client: self.rpc_client.close() - if self.subscriber: - self.subscriber.close() diff --git a/uprotocol/transport/ulistener.py b/uprotocol/transport/ulistener.py index cc188d8..d3c7584 100644 --- a/uprotocol/transport/ulistener.py +++ b/uprotocol/transport/ulistener.py @@ -18,13 +18,25 @@ class UListener(ABC): - """For any implementation that defines some kind of callback or function that will be called to handle incoming + """ + For any implementation that defines some kind of callback or function that will be called to handle incoming messages. """ @abstractmethod - def on_receive(self, umsg: UMessage) -> None: - """Method called to handle/process messages.

+ async def on_receive(self, umsg: UMessage) -> None: + """ + Method called to handle/process messages. + + `on_receive()` is expected to return almost immediately. If it does not, it could potentially + block further message receipt. For long-running operations, consider passing off received + data to a different async function to handle it and return. + + Note for `UTransport` implementers: + + Because `on_receive()` is an async function, you may choose to either `await` it in the current context + or spawn it onto a new task and await it there to allow the current context to continue immediately. + @param umsg: UMessage to be sent. """ pass diff --git a/uprotocol/transport/utransport.py b/uprotocol/transport/utransport.py index 85b936f..d0cb66b 100644 --- a/uprotocol/transport/utransport.py +++ b/uprotocol/transport/utransport.py @@ -15,6 +15,7 @@ from abc import ABC, abstractmethod from uprotocol.transport.ulistener import UListener +from uprotocol.uri.factory.uri_factory import UriFactory from uprotocol.v1.umessage_pb2 import UMessage from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus @@ -39,7 +40,9 @@ async def send(self, message: UMessage) -> UStatus: pass @abstractmethod - async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def register_listener( + self, source_filter: UUri, listener: UListener, sink_filter: UUri = UriFactory.ANY + ) -> UStatus: """Register UListener for UUri source and sink filters to be called when a message is received. @@ -56,7 +59,9 @@ async def register_listener(self, source_filter: UUri, listener: UListener, sink pass @abstractmethod - async def unregister_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + async def unregister_listener( + self, source_filter: UUri, listener: UListener, sink_filter: UUri = UriFactory.ANY + ) -> UStatus: """Unregister UListener for UUri source and sink filters. Messages arriving at this topic will no longer be processed by this listener.