From 80b117920a3f94a9f729b5cd72228b9710e6c56b Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Wed, 10 Jul 2024 11:34:50 -0400 Subject: [PATCH 1/2] Add Calloptions to publish and notify api --- .../test_communication/test_simplenotifier.py | 4 +-- .../test_simplepublisher.py | 6 ++--- tests/test_communication/test_uclient.py | 27 ++++++++++++------- uprotocol/communication/notifier.py | 9 +++++-- uprotocol/communication/publisher.py | 9 +++++-- uprotocol/communication/simplenotifier.py | 10 ++++++- uprotocol/communication/simplepublisher.py | 8 +++++- uprotocol/communication/uclient.py | 14 +++++++--- 8 files changed, 62 insertions(+), 25 deletions(-) diff --git a/tests/test_communication/test_simplenotifier.py b/tests/test_communication/test_simplenotifier.py index a0429fc..d9043d9 100644 --- a/tests/test_communication/test_simplenotifier.py +++ b/tests/test_communication/test_simplenotifier.py @@ -32,13 +32,13 @@ def create_destination_uri(self): async def test_send_notification(self): notifier = SimpleNotifier(MockUTransport()) - status = await notifier.notify(self.create_topic(), self.create_destination_uri(), None) + status = await notifier.notify(self.create_topic(), self.create_destination_uri()) self.assertEqual(status.code, UCode.OK) async def test_send_notification_with_payload(self): uri = UUri(authority_name="Neelam") notifier = SimpleNotifier(MockUTransport()) - status = await notifier.notify(self.create_topic(), self.create_destination_uri(), UPayload.pack(uri)) + status = await notifier.notify(self.create_topic(), self.create_destination_uri(), payload=UPayload.pack(uri)) self.assertEqual(status.code, UCode.OK) async def test_register_listener(self): diff --git a/tests/test_communication/test_simplepublisher.py b/tests/test_communication/test_simplepublisher.py index 6762bda..863e194 100644 --- a/tests/test_communication/test_simplepublisher.py +++ b/tests/test_communication/test_simplepublisher.py @@ -28,13 +28,13 @@ def create_topic(self): async def test_send_publish(self): publisher = SimplePublisher(MockUTransport()) - status = await publisher.publish(self.create_topic(), None) + status = await publisher.publish(self.create_topic()) self.assertEqual(status.code, UCode.OK) async def test_send_publish_with_stuffed_payload(self): uri = UUri(authority_name="Neelam") publisher = SimplePublisher(MockUTransport()) - status = await publisher.publish(self.create_topic(), UPayload.pack_to_any(uri)) + status = await publisher.publish(self.create_topic(), payload=UPayload.pack_to_any(uri)) self.assertEqual(status.code, UCode.OK) def test_constructor_transport_none(self): @@ -52,7 +52,7 @@ async def test_publish_topic_none(self): uri = UUri(authority_name="Neelam") with self.assertRaises(ValueError) as context: - await publisher.publish(None, UPayload.pack_to_any(uri)) + await publisher.publish(None, payload=UPayload.pack_to_any(uri)) self.assertEqual(str(context.exception), "Publish topic missing") diff --git a/tests/test_communication/test_uclient.py b/tests/test_communication/test_uclient.py index 4512308..cce62aa 100644 --- a/tests/test_communication/test_uclient.py +++ b/tests/test_communication/test_uclient.py @@ -51,12 +51,14 @@ def test_create_upclient_with_null_transport(self): UClient(None) async def test_send_notification(self): - status = await UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), None) + status = await UClient(MockUTransport()).notify(create_topic(), create_destination_uri()) self.assertEqual(status.code, UCode.OK) async def test_send_notification_with_payload(self): uri = UUri(authority_name="neelam") - status = await UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), UPayload.pack(uri)) + status = await UClient(MockUTransport()).notify( + create_topic(), create_destination_uri(), payload=UPayload.pack(uri) + ) self.assertEqual(status.code, UCode.OK) async def test_register_listener(self): @@ -85,19 +87,24 @@ async def test_unregister_listener_not_registered(self): self.assertEqual(status.code, UCode.INVALID_ARGUMENT) async def test_send_publish(self): - status = await UClient(MockUTransport()).publish(create_topic(), None) + status = await UClient(MockUTransport()).publish(create_topic()) self.assertEqual(status.code, UCode.OK) async def test_send_publish_with_stuffed_payload(self): uri = UUri(authority_name="neelam") - status = await UClient(MockUTransport()).publish(create_topic(), UPayload.pack_to_any(uri)) + status = await UClient(MockUTransport()).publish(create_topic(), payload=UPayload.pack_to_any(uri)) + self.assertEqual(status.code, UCode.OK) + + async def test_send_publish_with_stuffed_payload_and_calloptions(self): + uri = UUri(authority_name="neelam") + status = await UClient(MockUTransport()).publish( + create_topic(), CallOptions(token="134"), payload=UPayload.pack_to_any(uri) + ) self.assertEqual(status.code, UCode.OK) async def test_invoke_method_with_payload(self): payload = UPayload.pack_to_any(UUri()) - future_result = asyncio.ensure_future( - UClient(MockUTransport()).invoke_method(create_method_uri(), payload, None) - ) + future_result = asyncio.ensure_future(UClient(MockUTransport()).invoke_method(create_method_uri(), payload)) response = await future_result self.assertIsNotNone(response) self.assertFalse(future_result.exception()) @@ -132,10 +139,10 @@ async def test_invoke_method_with_multi_invoke_transport(self): rpc_client = UClient(MockUTransport()) payload = UPayload.pack_to_any(UUri()) - future_result1 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload, None)) + future_result1 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload)) response = await future_result1 self.assertIsNotNone(response) - future_result2 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload, None)) + future_result2 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload)) response2 = await future_result2 self.assertIsNotNone(response2) @@ -195,7 +202,7 @@ async def test_request_handler_for_notification(self): handler = create_autospec(RequestHandler, instance=True) await client.register_request_handler(create_method_uri(), handler) - self.assertEqual(await client.notify(create_topic(), transport.get_source(), None), UStatus(code=UCode.OK)) + self.assertEqual(await client.notify(create_topic(), transport.get_source()), UStatus(code=UCode.OK)) def create_topic(): diff --git a/uprotocol/communication/notifier.py b/uprotocol/communication/notifier.py index 96939bd..087e432 100644 --- a/uprotocol/communication/notifier.py +++ b/uprotocol/communication/notifier.py @@ -13,7 +13,9 @@ """ from abc import ABC, abstractmethod +from typing import Optional +from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.upayload import UPayload from uprotocol.transport.ulistener import UListener from uprotocol.v1.uri_pb2 import UUri @@ -29,12 +31,15 @@ class Notifier(ABC): """ @abstractmethod - async def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus: + async def notify( + self, topic: UUri, destination: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None + ) -> UStatus: """ - Send a notification to a given topic passing a payload. + Send a notification to a given topic. :param topic: The topic to send the notification to. :param destination: The destination to send the notification to. + :param options: Call options for the notification. :param payload: The payload to send with the notification. :return: Returns the UStatus with the status of the notification. """ diff --git a/uprotocol/communication/publisher.py b/uprotocol/communication/publisher.py index f708ee8..c4c8a09 100644 --- a/uprotocol/communication/publisher.py +++ b/uprotocol/communication/publisher.py @@ -13,7 +13,9 @@ """ from abc import ABC, abstractmethod +from typing import Optional +from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.upayload import UPayload from uprotocol.v1.uri_pb2 import UUri from uprotocol.v1.ustatus_pb2 import UStatus @@ -30,11 +32,14 @@ class Publisher(ABC): """ @abstractmethod - async def publish(self, topic: UUri, payload: UPayload) -> UStatus: + async def publish( + self, topic: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None + ) -> UStatus: """ - Publish a message to a topic passing UPayload as the payload. + Publish a message to a topic. :param topic: The topic to publish to. + :param options: Call options for the publish. :param payload: The UPayload to publish. :return: An instance of UStatus indicating the status of the publish operation. """ diff --git a/uprotocol/communication/simplenotifier.py b/uprotocol/communication/simplenotifier.py index 01a3775..3d8abba 100644 --- a/uprotocol/communication/simplenotifier.py +++ b/uprotocol/communication/simplenotifier.py @@ -14,6 +14,7 @@ from typing import Optional +from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.notifier import Notifier from uprotocol.communication.upayload import UPayload from uprotocol.transport.builder.umessagebuilder import UMessageBuilder @@ -45,16 +46,23 @@ def __init__(self, transport: UTransport): raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) self.transport = transport - async def notify(self, topic: UUri, destination: UUri, payload: Optional[UPayload] = None) -> UStatus: + async def notify( + self, topic: UUri, destination: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None + ) -> UStatus: """ Send a notification to a given topic. :param topic: The topic to send the notification to. :param destination: The destination to send the notification to. + :param options: Call options for the notification. :param payload: The payload to send with the notification. :return: Returns the UStatus with the status of the notification. """ builder = UMessageBuilder.notification(topic, destination) + if options: + builder.with_priority(options.priority) + builder.with_ttl(options.timeout) + builder.with_token(options.token) return await self.transport.send(builder.build() if payload is None else builder.build_from_upayload(payload)) async def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: diff --git a/uprotocol/communication/simplepublisher.py b/uprotocol/communication/simplepublisher.py index 1681500..367eaac 100644 --- a/uprotocol/communication/simplepublisher.py +++ b/uprotocol/communication/simplepublisher.py @@ -12,6 +12,9 @@ SPDX-License-Identifier: Apache-2.0 """ +from typing import Optional + +from uprotocol.communication.calloptions import CallOptions from uprotocol.communication.publisher import Publisher from uprotocol.communication.upayload import UPayload from uprotocol.transport.builder.umessagebuilder import UMessageBuilder @@ -33,11 +36,14 @@ def __init__(self, transport: UTransport): raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) self.transport = transport - async def publish(self, topic: UUri, payload: UPayload) -> UStatus: + async def publish( + self, topic: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None + ) -> UStatus: """ Publishes a message to a topic using the provided payload. :param topic: The topic to publish the message to. + :param options: Call options for the publish. :param payload: The payload to be published. :return: An instance of UStatus indicating the status of the publish operation. """ diff --git a/uprotocol/communication/uclient.py b/uprotocol/communication/uclient.py index 586ca2b..7e322dc 100644 --- a/uprotocol/communication/uclient.py +++ b/uprotocol/communication/uclient.py @@ -108,16 +108,19 @@ async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus """ return await self.subscriber.unregister_listener(topic, listener) - async def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus: + async def notify( + self, topic: UUri, destination: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None + ) -> UStatus: """ Send a notification to a given topic. :param topic: The topic to send the notification to. :param destination: The destination to send the notification to. + :param options: Call options for the notification. :param payload: The payload to send with the notification. :return: Returns the UStatus with the status of the notification. """ - return await self.notifier.notify(topic, destination, payload) + return await self.notifier.notify(topic, destination, options, payload) async def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: """ @@ -139,15 +142,18 @@ async def unregister_notification_listener(self, topic: UUri, listener: UListene """ return await self.notifier.unregister_notification_listener(topic, listener) - async def publish(self, topic: UUri, payload: UPayload) -> UStatus: + async def publish( + self, topic: UUri, options: Optional[CallOptions] = None, payload: Optional[UPayload] = None + ) -> UStatus: """ Publishes a message to a topic using the provided payload. :param topic: The topic to publish the message to. + :param options: Call options for the publish. :param payload: The payload to be published. :return: An instance of UStatus indicating the status of the publish operation. """ - return await self.publisher.publish(topic, payload) + return await self.publisher.publish(topic, options, payload) async def register_request_handler(self, method: UUri, handler): """ From fd33f410543745547c001178e7db03f124d2c726 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Wed, 10 Jul 2024 11:56:00 -0400 Subject: [PATCH 2/2] Incorportate reviewer comment --- uprotocol/communication/simplepublisher.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/uprotocol/communication/simplepublisher.py b/uprotocol/communication/simplepublisher.py index 367eaac..694a0b8 100644 --- a/uprotocol/communication/simplepublisher.py +++ b/uprotocol/communication/simplepublisher.py @@ -50,5 +50,9 @@ async def publish( if topic is None: raise ValueError("Publish topic missing") - message = UMessageBuilder.publish(topic).build_from_upayload(payload) - return await self.transport.send(message) + builder = UMessageBuilder.publish(topic) + if options: + builder.with_priority(options.priority) + builder.with_ttl(options.timeout) + builder.with_token(options.token) + return await self.transport.send(builder.build_from_upayload(payload))