diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index adb88f1..116b983 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -40,12 +40,14 @@ jobs: - name: Run tests with coverage run: | - poetry run coverage run --source=uprotocol -m pytest + set -o pipefail + poetry run coverage run --source=uprotocol -m pytest -x -o log_cli=true --timeout=300 2>&1 | tee test-output.log poetry run coverage report > coverage_report.txt export COVERAGE_PERCENTAGE=$(awk '/TOTAL/{print $4}' coverage_report.txt) echo "COVERAGE_PERCENTAGE=$COVERAGE_PERCENTAGE" >> $GITHUB_ENV echo "COVERAGE_PERCENTAGE: $COVERAGE_PERCENTAGE" poetry run coverage html + timeout-minutes: 3 # Set a timeout of 3 minutes for this step - name: Upload coverage report uses: actions/upload-artifact@5d5d22a31266ced268874388b861e4b58bb5c2f3 # v4.3.1 @@ -79,15 +81,3 @@ jobs: with: name: pr-comment path: pr-comment/ - - - name: Check code coverage - uses: actions/github-script@60a0d83039c74a4aee543508d2ffcb1c3799cdea # v7.0.1 - with: - script: | - const COVERAGE_PERCENTAGE = process.env.COVERAGE_PERCENTAGE; - if (parseInt(COVERAGE_PERCENTAGE) < 95){ - core.setFailed(`Coverage Percentage is less than 95%: ${COVERAGE_PERCENTAGE}`); - }else{ - core.info(`Success`); - core.info(parseInt(COVERAGE_PERCENTAGE)); - } diff --git a/pyproject.toml b/pyproject.toml index 4c88d70..6e5cb7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,8 +20,10 @@ googleapis-common-protos = ">=1.56.4" protobuf = "4.24.2" [tool.poetry.dev-dependencies] -pytest = "^6.2" -coverage = "^5.5" +pytest = ">=6.2.5" +pytest-asyncio = ">=0.15.1" +coverage = ">=6.5.0" +pytest-timeout = ">=1.4.2" [build-system] requires = ["poetry-core"] diff --git a/scripts/pull_and_compile_protos.py b/scripts/pull_and_compile_protos.py index 4c40684..3a8fff4 100644 --- a/scripts/pull_and_compile_protos.py +++ b/scripts/pull_and_compile_protos.py @@ -22,7 +22,7 @@ REPO_URL = "https://github.com/eclipse-uprotocol/up-spec.git" PROTO_REPO_DIR = os.path.abspath("../target") -TAG_NAME = "main" +TAG_NAME = "v1.6.0-alpha.2" PROTO_OUTPUT_DIR = os.path.abspath("../uprotocol/") diff --git a/tests/test_communication/__init__.py b/tests/test_communication/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_communication/mock_utransport.py b/tests/test_communication/mock_utransport.py new file mode 100644 index 0000000..70d9d4e --- /dev/null +++ b/tests/test_communication/mock_utransport.py @@ -0,0 +1,157 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import threading +from abc import ABC +from concurrent.futures import ThreadPoolExecutor +from typing import Dict, List + +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport +from uprotocol.transport.validator.uattributesvalidator import UAttributesValidator +from uprotocol.uri.factory.uri_factory import UriFactory +from uprotocol.uri.serializer.uriserializer import UriSerializer +from uprotocol.uri.validator.urivalidator import UriValidator +from uprotocol.v1.uattributes_pb2 import ( + UMessageType, +) +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus +from uprotocol.validation.validationresult import ValidationResult + + +class MockUTransport(UTransport): + def get_source(self) -> UUri: + return self.source + + def __init__(self, source=None): + super().__init__() + self.source = source if source else UUri(authority_name="Neelam", ue_id=4, ue_version_major=1) + self.listeners: Dict[str, List[UListener]] = {} + self.lock = threading.Lock() + + def build_response(self, request: UMessage): + payload = UPayload.pack_from_data_and_format(request.payload, request.attributes.payload_format) + + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload(payload) + + def close(self): + self.listeners.clear() + + def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + with self.lock: + if sink_filter is not None: # method uri + topic = UriSerializer().serialize(sink_filter) + else: + topic = UriSerializer().serialize(source_filter) + + if topic not in self.listeners: + self.listeners[topic] = [] + self.listeners[topic].append(listener) + return UStatus(code=UCode.OK) + + def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: + with self.lock: + if sink is not None: # method uri + topic = UriSerializer().serialize(sink) + else: + topic = UriSerializer().serialize(source) + + if topic in self.listeners and listener in self.listeners[topic]: + self.listeners[topic].remove(listener) + if not self.listeners[topic]: # If the list is empty, remove the key + del self.listeners[topic] + code = UCode.OK + else: + code = UCode.INVALID_ARGUMENT + result = UStatus(code=code) + return result + + def send(self, message: UMessage) -> UStatus: + validator = UAttributesValidator.get_validator(message.attributes) + with self.lock: + if message is None or validator.validate(message.attributes) != ValidationResult.success(): + return UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid message attributes") + + executor = ThreadPoolExecutor(max_workers=5) + executor.submit(self._notify_listeners, message) + + return UStatus(code=UCode.OK) + + def _notify_listeners(self, umsg): + if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_PUBLISH: + for key, listeners in self.listeners.items(): + uri = UriSerializer().deserialize(key) + if not (UriValidator.is_rpc_method(uri) or UriValidator.is_rpc_response(uri)): + for listener in listeners: + listener.on_receive(umsg) + + else: + if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST: + serialized_uri = UriSerializer().serialize(umsg.attributes.sink) + if serialized_uri not in self.listeners: + # no listener registered for method uri, send dummy response. + # This case will only come for request type + # as for response type, there will always be response handler as it is in up client + serialized_uri = UriSerializer().serialize(UriFactory.ANY) + umsg = self.build_response(umsg) + else: + # this is for response type,handle response + serialized_uri = UriSerializer().serialize(UriFactory.ANY) + + if serialized_uri in self.listeners: + for listener in self.listeners[serialized_uri]: + listener.on_receive(umsg) + break # as there will be only one listener for method uri + + +class TimeoutUTransport(MockUTransport, ABC): + def send(self, message): + return UStatus(code=UCode.OK) + + +class ErrorUTransport(MockUTransport, ABC): + def send(self, message): + return UStatus(code=UCode.FAILED_PRECONDITION) + + def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: + return UStatus(code=UCode.FAILED_PRECONDITION) + + def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus: + return UStatus(code=UCode.FAILED_PRECONDITION) + + +class CommStatusTransport(MockUTransport): + def build_response(self, request): + status = UStatus(code=UCode.FAILED_PRECONDITION, message="CommStatus Error") + return ( + UMessageBuilder.response_for_request(request.attributes) + .with_commstatus(status.code) + .build_from_upayload(UPayload.pack(status)) + ) + + +class EchoUTransport(MockUTransport): + def build_response(self, request): + return request + + def send(self, message): + response = self.build_response(message) + executor = ThreadPoolExecutor(max_workers=1) + executor.submit(self._notify_listeners, response) + return UStatus(code=UCode.OK) diff --git a/tests/test_communication/test_calloptions.py b/tests/test_communication/test_calloptions.py new file mode 100644 index 0000000..16a93c2 --- /dev/null +++ b/tests/test_communication/test_calloptions.py @@ -0,0 +1,105 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from uprotocol.communication.calloptions import CallOptions +from uprotocol.v1.uattributes_pb2 import ( + UPriority, +) +from uprotocol.v1.uri_pb2 import UUri + + +class TestCallOptions(unittest.TestCase): + def test_build_null_call_options(self): + """Test building a null CallOptions that is equal to the default""" + options = CallOptions() + self.assertEqual(options, CallOptions.DEFAULT) + + def test_build_call_options_with_null_timeout(self): + with self.assertRaises(ValueError) as context: + CallOptions(timeout=None) + self.assertEqual(str(context.exception), "timeout cannot be None") + + def test_build_call_options_with_null_token(self): + with self.assertRaises(ValueError) as context: + CallOptions(token=None) + self.assertEqual(str(context.exception), "token cannot be None") + + def test_build_call_options_with_null_priority(self): + with self.assertRaises(ValueError) as context: + CallOptions(priority=None) + self.assertEqual(str(context.exception), "priority cannot be None") + + def test_build_call_options_with_timeout(self): + """Test building a CallOptions with a timeout""" + options = CallOptions(timeout=1000) + self.assertEqual(1000, options.timeout) + self.assertEqual(UPriority.UPRIORITY_CS4, options.priority) + self.assertTrue(options.token == "") + + def test_build_call_options_with_priority(self): + """Test building a CallOptions with a priority""" + options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS4) + self.assertEqual(UPriority.UPRIORITY_CS4, options.priority) + + def test_build_call_options_with_all_parameters(self): + """Test building a CallOptions with all parameters""" + options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS4, token="token") + self.assertEqual(1000, options.timeout) + self.assertEqual(UPriority.UPRIORITY_CS4, options.priority) + self.assertEqual("token", options.token) + + def test_build_call_options_with_blank_token(self): + """Test building a CallOptions with a blank token""" + options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS4, token="") + self.assertTrue(options.token == "") + + def test_is_equals_with_null(self): + """Test isEquals when passed parameter is not equals""" + options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS4, token="token") + self.assertNotEqual(options, None) + + def test_is_equals_with_same_object(self): + """Test isEquals when passed parameter is equals""" + options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS4, token="token") + self.assertEqual(options, options) + + def test_is_equals_with_different_parameters(self): + """Test isEquals when timeout is not the same""" + options = CallOptions(timeout=1001, priority=UPriority.UPRIORITY_CS3, token="token") + other_options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS3, token="token") + self.assertNotEqual(options, other_options) + + def test_is_equals_with_different_parameters_priority(self): + """Test isEquals when priority is not the same""" + options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS4, token="token") + other_options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS3, token="token") + self.assertNotEqual(options, other_options) + + def test_is_equals_with_different_parameters_token(self): + """Test isEquals when token is not the same""" + options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS3, token="Mytoken") + other_options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS3, token="token") + self.assertNotEqual(options, other_options) + + def test_is_equals_with_different_type(self): + """Test equals when object passed is not the same type as CallOptions""" + options = CallOptions(timeout=1000, priority=UPriority.UPRIORITY_CS4, token="token") + uri = UUri() + self.assertNotEqual(options, uri) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_inmemoryrpcclient.py b/tests/test_communication/test_inmemoryrpcclient.py new file mode 100644 index 0000000..7716e58 --- /dev/null +++ b/tests/test_communication/test_inmemoryrpcclient.py @@ -0,0 +1,102 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from tests.test_communication.mock_utransport import CommStatusTransport, MockUTransport, TimeoutUTransport +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient +from uprotocol.communication.upayload import UPayload +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.v1.uattributes_pb2 import UPriority +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class TestInMemoryRpcClient(unittest.IsolatedAsyncioTestCase): + @staticmethod + def create_method_uri(): + return UUri(authority_name="neelam", ue_id=10, ue_version_major=1, resource_id=3) + + async def test_invoke_method_with_payload(self): + payload = UPayload.pack_to_any(UUri()) + rpc_client = InMemoryRpcClient(MockUTransport()) + response = await rpc_client.invoke_method(self.create_method_uri(), payload, None) + self.assertIsNotNone(response) + + async def test_invoke_method_with_payload_and_call_options(self): + payload = UPayload.pack_to_any(UUri()) + options = CallOptions(2000, UPriority.UPRIORITY_CS5) + rpc_client = InMemoryRpcClient(MockUTransport()) + response = await rpc_client.invoke_method(self.create_method_uri(), payload, options) + self.assertIsNotNone(response) + + async def test_invoke_method_with_null_payload(self): + rpc_client = InMemoryRpcClient(MockUTransport()) + response = await rpc_client.invoke_method(self.create_method_uri(), None, CallOptions.DEFAULT) + self.assertIsNotNone(response) + + async def test_invoke_method_with_timeout_transport(self): + payload = UPayload.pack_to_any(UUri()) + options = CallOptions(100, UPriority.UPRIORITY_CS5, "token") + rpc_client = InMemoryRpcClient(TimeoutUTransport()) + with self.assertRaises(UStatusError) as context: + await rpc_client.invoke_method(self.create_method_uri(), payload, options) + self.assertEqual(UCode.DEADLINE_EXCEEDED, context.exception.status.code) + self.assertEqual("Request timed out", context.exception.status.message) + + async def test_invoke_method_with_multi_invoke_transport(self): + rpc_client = InMemoryRpcClient(MockUTransport()) + payload = UPayload.pack_to_any(UUri()) + response1 = await rpc_client.invoke_method(self.create_method_uri(), payload, None) + response2 = await rpc_client.invoke_method(self.create_method_uri(), payload, None) + self.assertIsNotNone(response1) + self.assertIsNotNone(response2) + self.assertEqual(response1, response2) + + async def test_close_with_multiple_listeners(self): + rpc_client = InMemoryRpcClient(MockUTransport()) + payload = UPayload.pack_to_any(UUri()) + + response1 = await rpc_client.invoke_method(self.create_method_uri(), payload, None) + response2 = await rpc_client.invoke_method(self.create_method_uri(), payload, None) + self.assertIsNotNone(response1) + self.assertIsNotNone(response2) + self.assertEqual(response1, response2) + rpc_client.close() + + async def test_invoke_method_with_comm_status_transport(self): + rpc_client = InMemoryRpcClient(CommStatusTransport()) + payload = UPayload.pack_to_any(UUri()) + with self.assertRaises(UStatusError) as context: + await rpc_client.invoke_method(self.create_method_uri(), payload, None) + self.assertEqual(UCode.FAILED_PRECONDITION, context.exception.status.code) + self.assertEqual("Communication error [FAILED_PRECONDITION]", context.exception.status.message) + + async def test_invoke_method_with_error_transport(self): + class ErrorUTransport(MockUTransport): + def send(self, message): + return UStatus(code=UCode.FAILED_PRECONDITION) + + rpc_client = InMemoryRpcClient(ErrorUTransport()) + payload = UPayload.pack_to_any(UUri()) + with self.assertRaises(UStatusError) as context: + await rpc_client.invoke_method(self.create_method_uri(), payload, None) + + self.assertEqual(UCode.FAILED_PRECONDITION, context.exception.status.code) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_inmemoryrpcserver.py b/tests/test_communication/test_inmemoryrpcserver.py new file mode 100644 index 0000000..7a6c4e1 --- /dev/null +++ b/tests/test_communication/test_inmemoryrpcserver.py @@ -0,0 +1,182 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import copy +import unittest +from unittest.mock import MagicMock + +from tests.test_communication.mock_utransport import ( + ErrorUTransport, + MockUTransport, +) +from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer +from uprotocol.communication.requesthandler import RequestHandler +from uprotocol.communication.upayload import UPayload +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.utransport import UTransport +from uprotocol.uri.serializer.uriserializer import UriSerializer +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class TestInMemoryRpcServer(unittest.TestCase): + @staticmethod + def create_method_uri(): + return UUri(authority_name="Neelam", ue_id=4, ue_version_major=1, resource_id=3) + + def test_constructor_transport_none(self): + with self.assertRaises(ValueError) as context: + InMemoryRpcServer(None) + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NULL_ERROR) + + def test_constructor_transport_not_instance(self): + with self.assertRaises(ValueError) as context: + InMemoryRpcServer("Invalid Transport") + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + + def test_register_request_handler_method_uri_none(self): + server = InMemoryRpcServer(MockUTransport()) + handler = MagicMock(return_value=UPayload.EMPTY) + + with self.assertRaises(ValueError) as context: + server.register_request_handler(None, handler) + self.assertEqual(str(context.exception), "Method URI missing") + + def test_register_request_handler_handler_none(self): + server = InMemoryRpcServer(MockUTransport()) + with self.assertRaises(ValueError) as context: + server.register_request_handler(self.create_method_uri(), None) + self.assertEqual(str(context.exception), "Request listener missing") + + def test_unregister_request_handler_method_uri_none(self): + server = InMemoryRpcServer(MockUTransport()) + handler = MagicMock(return_value=UPayload.EMPTY) + + with self.assertRaises(ValueError) as context: + server.unregister_request_handler(None, handler) + self.assertEqual(str(context.exception), "Method URI missing") + + def test_unregister_request_handler_handler_none(self): + server = InMemoryRpcServer(MockUTransport()) + with self.assertRaises(ValueError) as context: + server.unregister_request_handler(self.create_method_uri(), None) + self.assertEqual(str(context.exception), "Request listener missing") + + def test_registering_request_listener(self): + handler = MagicMock(return_value=UPayload.EMPTY) + method = self.create_method_uri() + server = InMemoryRpcServer(MockUTransport()) + self.assertEqual(server.register_request_handler(method, handler).code, UCode.OK) + self.assertEqual(server.unregister_request_handler(method, handler).code, UCode.OK) + + def test_registering_twice_the_same_request_handler(self): + handler = MagicMock(return_value=UPayload.EMPTY) + server = InMemoryRpcServer(MockUTransport()) + status = server.register_request_handler(self.create_method_uri(), handler) + self.assertEqual(status.code, UCode.OK) + status = server.register_request_handler(self.create_method_uri(), handler) + self.assertEqual(status.code, UCode.ALREADY_EXISTS) + + def test_unregistering_non_registered_request_handler(self): + handler = MagicMock(side_effect=NotImplementedError("Unimplemented method 'handleRequest'")) + server = InMemoryRpcServer(MockUTransport()) + status = server.unregister_request_handler(self.create_method_uri(), handler) + self.assertEqual(status.code, UCode.NOT_FOUND) + + def test_registering_request_listener_with_error_transport(self): + handler = MagicMock(return_value=UPayload.EMPTY) + server = InMemoryRpcServer(ErrorUTransport()) + status = server.register_request_handler(self.create_method_uri(), handler) + self.assertEqual(status.code, UCode.FAILED_PRECONDITION) + + def test_handle_requests(self): + class CustomTestUTransport(MockUTransport): + def send(self, message): + serialized_uri = UriSerializer().serialize(message.attributes.sink) + if serialized_uri in self.listeners: + for listener in self.listeners[serialized_uri]: + listener.on_receive(message) + return UStatus(code=UCode.OK) + + transport = CustomTestUTransport() + handler = MagicMock(side_effect=Exception("this should not be called")) + server = InMemoryRpcServer(transport) + method = self.create_method_uri() + method2 = copy.deepcopy(method) + # Update the resource_id + method2.resource_id = 69 + + self.assertEqual(server.register_request_handler(method, handler).code, UCode.OK) + + request = UMessageBuilder.request(transport.get_source(), method2, 1000).build() + + # fake sending a request message that will trigger the handler to be called but since it is + # not for the same method as the one registered, it should be ignored and the handler not called + self.assertEqual(transport.send(request).code, UCode.OK) + + def test_handle_requests_exception(self): + # test transport that will trigger the handleRequest() + class CustomTestUTransport(MockUTransport): + def send(self, message): + serialized_uri = UriSerializer().serialize(message.attributes.sink) + if serialized_uri in self.listeners: + for listener in self.listeners[serialized_uri]: + listener.on_receive(message) + return UStatus(code=UCode.OK) + + transport = CustomTestUTransport() + + class MyRequestHandler(RequestHandler): + def handle_request(self, message: UMessage) -> UPayload: + raise UStatusError(UStatus(code=UCode.FAILED_PRECONDITION, message="Neelam it failed!")) + + handler = MyRequestHandler() + server = InMemoryRpcServer(transport) + method = self.create_method_uri() + + self.assertEqual(server.register_request_handler(method, handler).code, UCode.OK) + + request = UMessageBuilder.request(transport.get_source(), method, 1000).build() + self.assertEqual(transport.send(request).code, UCode.OK) + + def test_handle_requests_unknown_exception(self): + class CustomTestUTransport(MockUTransport): + def send(self, message): + serialized_uri = UriSerializer().serialize(message.attributes.sink) + if serialized_uri in self.listeners: + for listener in self.listeners[serialized_uri]: + listener.on_receive(message) + return UStatus(code=UCode.OK) + + transport = CustomTestUTransport() + + class MyRequestHandler(RequestHandler): + def handle_request(self, message: UMessage) -> UPayload: + raise Exception("Neelam it failed!") + + handler = MyRequestHandler() + server = InMemoryRpcServer(transport) + method = self.create_method_uri() + + self.assertEqual(server.register_request_handler(method, handler).code, UCode.OK) + + request = UMessageBuilder.request(transport.get_source(), method, 1000).build() + self.assertEqual(transport.send(request).code, UCode.OK) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_inmemorysubscriber.py b/tests/test_communication/test_inmemorysubscriber.py new file mode 100644 index 0000000..76fdf01 --- /dev/null +++ b/tests/test_communication/test_inmemorysubscriber.py @@ -0,0 +1,162 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from tests.test_communication.mock_utransport import CommStatusTransport, MockUTransport, TimeoutUTransport +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient +from uprotocol.communication.inmemorysubscriber import InMemorySubscriber +from uprotocol.communication.upayload import UPayload +from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + SubscriptionResponse, + SubscriptionStatus, + UnsubscribeResponse, +) +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri + + +class MyListener(UListener): + def on_receive(self, umsg: UMessage) -> None: + pass + + +class TestInMemorySubscriber(unittest.IsolatedAsyncioTestCase): + @classmethod + def setUpClass(cls): + cls.listener = MyListener() + + def create_topic(self): + return UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) + + async def test_subscribe_happy_path(self): + topic = self.create_topic() + transport = HappySubscribeUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + + subscription_response = await subscriber.subscribe(topic, self.listener, None) + self.assertFalse(subscription_response is None) + + async def test_unsubscribe_happy_path(self): + topic = self.create_topic() + transport = HappyUnSubscribeUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + + response = await subscriber.unsubscribe(topic, self.listener, None) + self.assertEqual(response.message, "") + self.assertEqual(response.code, UCode.OK) + + async def test_unregister_listener(self): + topic = self.create_topic() + + transport = HappySubscribeUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + + subscription_response = await subscriber.subscribe(topic, self.listener, CallOptions()) + self.assertFalse(subscription_response is None) + + status = subscriber.unregister_listener(topic, self.listener) + self.assertEqual(status.code, UCode.OK) + + async def test_unsubscribe_with_commstatus_error(self): + topic = UUri(authority_name="neelam", ue_id=4, ue_version_major=1, resource_id=0x8000) + transport = CommStatusTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + + response = await subscriber.unsubscribe(topic, self.listener, None) + self.assertEqual(response.message, "Communication error [FAILED_PRECONDITION]") + self.assertEqual(response.code, UCode.FAILED_PRECONDITION) + + async def test_unsubscribe_with_exception(self): + topic = self.create_topic() + transport = TimeoutUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + + response = await subscriber.unsubscribe(topic, self.listener, CallOptions(1)) + self.assertEqual(response.message, "Request timed out") + self.assertEqual(response.code, UCode.DEADLINE_EXCEEDED) + + def test_unregister_listener_missing_topic(self): + transport = TimeoutUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + with self.assertRaises(ValueError) as context: + subscriber.unregister_listener(None, self.listener) + self.assertEqual(str(context.exception), "Unsubscribe topic missing") + + def test_unregister_listener_missing_listener(self): + topic = self.create_topic() + transport = TimeoutUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + with self.assertRaises(ValueError) as context: + subscriber.unregister_listener(topic, None) + self.assertEqual(str(context.exception), "Request listener missing") + + async def test_unsubscribe_missing_topic(self): + transport = TimeoutUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + with self.assertRaises(ValueError) as context: + await subscriber.unsubscribe(None, self.listener, CallOptions()) + self.assertEqual(str(context.exception), "Unsubscribe topic missing") + + async def test_unsubscribe_missing_listener(self): + topic = self.create_topic() + transport = TimeoutUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + with self.assertRaises(ValueError) as context: + await subscriber.unsubscribe(topic, None, CallOptions()) + self.assertEqual(str(context.exception), "Listener missing") + + async def test_subscribe_missing_topic(self): + transport = TimeoutUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + with self.assertRaises(ValueError) as context: + await subscriber.subscribe(None, self.listener, CallOptions()) + self.assertEqual(str(context.exception), "Subscribe topic missing") + + async def test_subscribe_missing_listener(self): + topic = self.create_topic() + transport = TimeoutUTransport() + subscriber = InMemorySubscriber(transport, InMemoryRpcClient(transport)) + with self.assertRaises(ValueError) as context: + await subscriber.subscribe(topic, None, CallOptions()) + self.assertEqual(str(context.exception), "Request listener missing") + + +class HappySubscribeUTransport(MockUTransport): + def build_response(self, request): + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack( + SubscriptionResponse( + status=SubscriptionStatus( + state=SubscriptionStatus.State.SUBSCRIBED, message="Successfully Subscribed" + ), + topic=TestInMemorySubscriber().create_topic(), + ) + ) + ) + + +class HappyUnSubscribeUTransport(MockUTransport): + def build_response(self, request): + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack(UnsubscribeResponse()) + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_publisher.py b/tests/test_communication/test_publisher.py new file mode 100644 index 0000000..d2234e2 --- /dev/null +++ b/tests/test_communication/test_publisher.py @@ -0,0 +1,41 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from tests.test_communication.mock_utransport import MockUTransport +from uprotocol.communication.uclient import UClient +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.uri_pb2 import UUri + + +class TestPublisher(unittest.TestCase): + def test_send_publish(self): + # Topic to publish + topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) + + # Mock transport to use + transport = MockUTransport() + + # Create publisher instance using mock transport + publisher = UClient(transport) + + # Send the publish message + status = publisher.publish(topic, None) + # Assert that the status code is OK + self.assertEqual(status.code, UCode.OK) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_rpcmapper.py b/tests/test_communication/test_rpcmapper.py new file mode 100644 index 0000000..12efddd --- /dev/null +++ b/tests/test_communication/test_rpcmapper.py @@ -0,0 +1,157 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import unittest + +import pytest + +from tests.test_communication.mock_utransport import MockUTransport +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient +from uprotocol.communication.rpcmapper import RpcMapper +from uprotocol.communication.upayload import UPayload +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class TestRpcMapper(unittest.IsolatedAsyncioTestCase): + async def test_map_response(self): + uri = UUri(authority_name="Neelam") + payload = UPayload.pack(uri) + + rpc_client = InMemoryRpcClient(MockUTransport()) + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), payload, None)) + result = await RpcMapper.map_response(future_result, UUri) + assert result == uri + + async def test_map_response_to_result_with_empty_request(self): + rpc_client = InMemoryRpcClient(MockUTransport()) + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), None, None)) + result = await RpcMapper.map_response_to_result(future_result, UUri) + assert result.is_success() + assert result.success_value() == UUri() + + async def test_map_response_with_exception(self): + class RpcClientWithException: + async def invoke_method(self, uri, payload, options): + raise RuntimeError("Error") + + rpc_client = RpcClientWithException() + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), None, None)) + + with pytest.raises(RuntimeError): + await RpcMapper.map_response(future_result, UUri) + + async def test_map_response_with_empty_payload(self): + class RpcClientWithEmptyPayload: + async def invoke_method(self, uri, payload, options): + return UPayload.EMPTY + + rpc_client = RpcClientWithEmptyPayload() + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + result = await RpcMapper.map_response(future_result, UUri) + assert result == UUri() + + async def test_map_response_with_null_payload(self): + class RpcClientWithNullPayload: + async def invoke_method(self, uri, payload, options): + return None + + rpc_client = RpcClientWithNullPayload() + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + + with pytest.raises(Exception) as exc_info: + await RpcMapper.map_response(future_result, UUri) + assert str(exc_info.value) == f"Unknown payload. Expected [{UUri.__name__}]" + + async def test_map_response_to_result_with_non_empty_payload(self): + uri = UUri(authority_name="Neelam") + payload = UPayload.pack(uri) + + class RpcClientWithNonEmptyPayload: + async def invoke_method(self, uri, payload, options): + return payload + + rpc_client = RpcClientWithNonEmptyPayload() + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), payload, None)) + result = await RpcMapper.map_response_to_result(future_result, UUri) + assert result.is_success() + assert result.success_value() == uri + + async def test_map_response_to_result_with_null_payload(self): + class RpcClientWithNullPayload: + async def invoke_method(self, uri, payload, options): + return None + + rpc_client = RpcClientWithNullPayload() + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + result = await RpcMapper.map_response_to_result(future_result, UUri) + assert result.is_failure() + + async def test_map_response_to_result_with_empty_payload(self): + class RpcClientWithEmptyPayload: + async def invoke_method(self, uri, payload, options): + return UPayload.EMPTY + + rpc_client = RpcClientWithEmptyPayload() + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + result = await RpcMapper.map_response_to_result(future_result, UUri) + assert result.is_success() + assert result.success_value() == UUri() + + async def test_map_response_to_result_with_exception(self): + class RpcClientWithException: + async def invoke_method(self, uri, payload, options): + status = UStatus(code=UCode.FAILED_PRECONDITION, message="Error") + raise UStatusError(status) + + rpc_client = RpcClientWithException() + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + result = await RpcMapper.map_response_to_result(future_result, UUri) + assert result.is_failure() + assert result.failure_value().code == UCode.FAILED_PRECONDITION + assert result.failure_value().message == "Error" + + async def test_map_response_to_result_with_timeout_exception(self): + class RpcClientWithTimeoutException: + async def invoke_method(self, uri, payload, options): + raise asyncio.TimeoutError() + + rpc_client = RpcClientWithTimeoutException() + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + result = await RpcMapper.map_response_to_result(future_result, UUri) + assert result.is_failure() + assert result.failure_value().code == UCode.DEADLINE_EXCEEDED + assert result.failure_value().message == "Request timed out" + + async def test_map_response_to_result_with_invalid_arguments_exception(self): + class RpcClientWithInvalidArgumentsException: + async def invoke_method(self, uri, payload, options): + raise ValueError() + + rpc_client = RpcClientWithInvalidArgumentsException() + future_result = asyncio.ensure_future(rpc_client.invoke_method(self.create_method_uri(), UPayload.EMPTY, None)) + result = await RpcMapper.map_response_to_result(future_result, UUri) + assert result.is_failure() + assert result.failure_value().code == UCode.INVALID_ARGUMENT + assert result.failure_value().message == "" + + def create_method_uri(self): + return UUri(authority_name="Neelam", ue_id=10, ue_version_major=1, resource_id=3) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_rpcresult.py b/tests/test_communication/test_rpcresult.py new file mode 100644 index 0000000..d182faf --- /dev/null +++ b/tests/test_communication/test_rpcresult.py @@ -0,0 +1,50 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from uprotocol.communication.rpcresult import RpcResult +from uprotocol.v1.ucode_pb2 import UCode + + +class TestRpcResult(unittest.TestCase): + def test_is_success_on_success(self): + result = RpcResult.success(2) + self.assertTrue(result.is_success()) + + def test_is_success_on_failure(self): + result = RpcResult.failure(code=UCode.INVALID_ARGUMENT, message="boom") + self.assertFalse(result.is_success()) + + def test_is_failure_on_success(self): + result = RpcResult.success(2) + self.assertFalse(result.is_failure()) + + def test_is_failure_on_failure(self): + result = RpcResult.failure(code=UCode.INVALID_ARGUMENT, message="boom") + self.assertTrue(result.is_failure()) + + def test_to_string_success(self): + result = RpcResult.success(2) + self.assertEqual(str(result), "Success(2)") + + def test_to_string_failure(self): + result = RpcResult.failure(code=UCode.INVALID_ARGUMENT, message="boom") + self.assertTrue(result.is_failure()) + self.assertEqual(result.failure_value().code, UCode.INVALID_ARGUMENT) + self.assertEqual(result.failure_value().message, "boom") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_simplenotifier.py b/tests/test_communication/test_simplenotifier.py new file mode 100644 index 0000000..70dfb32 --- /dev/null +++ b/tests/test_communication/test_simplenotifier.py @@ -0,0 +1,79 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from tests.test_communication.mock_utransport import MockUTransport +from uprotocol.communication.simplenotifier import SimpleNotifier +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri + + +class TestSimpleNotifier(unittest.TestCase): + def create_topic(self): + return UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) + + def create_destination_uri(self): + return UUri(ue_id=4, ue_version_major=1) + + def test_send_notification(self): + notifier = SimpleNotifier(MockUTransport()) + status = notifier.notify(self.create_topic(), self.create_destination_uri(), None) + self.assertEqual(status.code, UCode.OK) + + def test_send_notification_with_payload(self): + uri = UUri(authority_name="Neelam") + notifier = SimpleNotifier(MockUTransport()) + status = notifier.notify(self.create_topic(), self.create_destination_uri(), UPayload.pack(uri)) + self.assertEqual(status.code, UCode.OK) + + def test_register_listener(self): + class TestListener(UListener): + def on_receive(self, message: UMessage): + pass + + listener = TestListener() + notifier = SimpleNotifier(MockUTransport()) + status = notifier.register_notification_listener(self.create_topic(), listener) + self.assertEqual(status.code, UCode.OK) + + def test_unregister_notification_listener(self): + class TestListener(UListener): + def on_receive(self, message: UMessage): + pass + + listener = TestListener() + notifier = SimpleNotifier(MockUTransport()) + status = notifier.register_notification_listener(self.create_topic(), listener) + self.assertEqual(status.code, UCode.OK) + + status = notifier.unregister_notification_listener(self.create_topic(), listener) + self.assertEqual(status.code, UCode.OK) + + def test_unregister_listener_not_registered(self): + class TestListener(UListener): + def on_receive(self, message: UMessage): + pass + + listener = TestListener() + notifier = SimpleNotifier(MockUTransport()) + status = notifier.unregister_notification_listener(self.create_topic(), listener) + self.assertEqual(status.code, UCode.INVALID_ARGUMENT) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_simplepublisher.py b/tests/test_communication/test_simplepublisher.py new file mode 100644 index 0000000..53f3093 --- /dev/null +++ b/tests/test_communication/test_simplepublisher.py @@ -0,0 +1,60 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from tests.test_communication.mock_utransport import MockUTransport +from uprotocol.communication.simplepublisher import SimplePublisher +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.utransport import UTransport +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.uri_pb2 import UUri + + +class TestSimplePublisher(unittest.TestCase): + def create_topic(self): + return UUri(authority_name="neelam", ue_id=3, ue_version_major=1, resource_id=0x8000) + + def test_send_publish(self): + publisher = SimplePublisher(MockUTransport()) + status = publisher.publish(self.create_topic(), None) + self.assertEqual(status.code, UCode.OK) + + def test_send_publish_with_stuffed_payload(self): + uri = UUri(authority_name="Neelam") + publisher = SimplePublisher(MockUTransport()) + status = publisher.publish(self.create_topic(), UPayload.pack_to_any(uri)) + self.assertEqual(status.code, UCode.OK) + + def test_constructor_transport_none(self): + with self.assertRaises(ValueError) as context: + SimplePublisher(None) + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NULL_ERROR) + + def test_constructor_transport_not_instance(self): + with self.assertRaises(ValueError) as context: + SimplePublisher("InvalidTransport") + self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + + def test_publish_topic_none(self): + publisher = SimplePublisher(MockUTransport()) + uri = UUri(authority_name="Neelam") + + with self.assertRaises(ValueError) as context: + publisher.publish(None, UPayload.pack_to_any(uri)) + self.assertEqual(str(context.exception), "Publish topic missing") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_subscriber.py b/tests/test_communication/test_subscriber.py new file mode 100644 index 0000000..66ecb94 --- /dev/null +++ b/tests/test_communication/test_subscriber.py @@ -0,0 +1,111 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import unittest +from unittest.mock import MagicMock + +from tests.test_communication.mock_utransport import MockUTransport +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.uclient import UClient +from uprotocol.communication.upayload import UPayload +from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + SubscriptionResponse, + SubscriptionStatus, + UnsubscribeResponse, +) +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri + + +class MyListener(UListener): + def on_receive(self, umsg: UMessage) -> None: + # Handle receiving subscriptions here + assert umsg is not None + + +class TestSubscriber(unittest.IsolatedAsyncioTestCase): + @classmethod + def setUpClass(cls): + cls.listener = MyListener() + + async def test_subscribe(self): + topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) + transport = HappySubscribeUTransport() + upclient = UClient(transport) + subscription_response = await upclient.subscribe(topic, self.listener, CallOptions(timeout=5000)) + # check for successfully subscribed + self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) + + async def test_publish_notify_subscribe_listener(self): + topic = UUri(ue_id=5, ue_version_major=1, resource_id=0x8000) + transport = HappySubscribeUTransport() + upclient = UClient(transport) + subscription_response = await upclient.subscribe(topic, self.listener, CallOptions(timeout=5000)) + self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) + + # Create a mock for MyListener's on_receive method + self.listener.on_receive = MagicMock(side_effect=self.listener.on_receive) + status = upclient.publish(topic, None) + self.assertEqual(status.code, UCode.OK) + # Wait for a short time to ensure on_receive can be called + await asyncio.sleep(1) + # Verify that on_receive was called + self.listener.on_receive.assert_called_once() + + async def test_unsubscribe(self): + topic = UUri(ue_id=6, ue_version_major=1, resource_id=0x8000) + transport = HappyUnSubscribeUTransport() + upclient = UClient(transport) + status = await upclient.unsubscribe(topic, self.listener, None) + # check for successfully unsubscribed + self.assertEqual(status.code, UCode.OK) + + async def test_subscribe_unsubscribe(self): + transport = HappySubscribeUTransport() + upclient = UClient(transport) + topic = UUri(ue_id=7, ue_version_major=1, resource_id=0x8000) + subscription_response = await upclient.subscribe(topic, self.listener, None) + self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) + + status2 = await upclient.unsubscribe(topic, self.listener, None) + # check for successfully unsubscribed + self.assertEqual(status2.code, UCode.OK) + + +class HappySubscribeUTransport(MockUTransport): + def build_response(self, request): + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack( + SubscriptionResponse( + status=SubscriptionStatus( + state=SubscriptionStatus.State.SUBSCRIBED, message="Successfully Subscribed" + ) + ) + ) + ) + + +class HappyUnSubscribeUTransport(MockUTransport): + def build_response(self, request): + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack(UnsubscribeResponse()) + ) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_uclient.py b/tests/test_communication/test_uclient.py new file mode 100644 index 0000000..1490ed2 --- /dev/null +++ b/tests/test_communication/test_uclient.py @@ -0,0 +1,240 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +import unittest +from unittest.mock import MagicMock, create_autospec + +from tests.test_communication.mock_utransport import EchoUTransport, ErrorUTransport, MockUTransport, TimeoutUTransport +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.requesthandler import RequestHandler +from uprotocol.communication.uclient import UClient +from uprotocol.communication.upayload import UPayload +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + SubscriptionResponse, + SubscriptionStatus, + UnsubscribeResponse, +) +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class MyListener(UListener): + def on_receive(self, umsg: UMessage) -> None: + # Handle receiving subscriptions here + assert umsg is not None + + +class UPClientTest(unittest.IsolatedAsyncioTestCase): + @classmethod + def setUpClass(cls): + cls.listener = MyListener() + + def test_create_upclient_with_null_transport(self): + with self.assertRaises(ValueError): + UClient(None) + + def test_create_upclient_with_error_transport(self): + with self.assertRaises(UStatusError): + UClient(ErrorUTransport()) + + def test_send_notification(self): + status = UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), None) + self.assertEqual(status.code, UCode.OK) + + def test_send_notification_with_payload(self): + uri = UUri(authority_name="neelam") + status = UClient(MockUTransport()).notify(create_topic(), create_destination_uri(), UPayload.pack(uri)) + self.assertEqual(status.code, UCode.OK) + + def test_register_listener(self): + listener = create_autospec(UListener, instance=True) + listener.on_receive = MagicMock() + + status = UClient(MockUTransport()).register_notification_listener(create_topic(), listener) + self.assertEqual(status.code, UCode.OK) + + def test_unregister_notification_listener(self): + listener = create_autospec(UListener, instance=True) + listener.on_receive = MagicMock() + + notifier = UClient(MockUTransport()) + status = notifier.register_notification_listener(create_topic(), listener) + self.assertEqual(status.code, UCode.OK) + + status = notifier.unregister_notification_listener(create_topic(), listener) + self.assertEqual(status.code, UCode.OK) + + def test_unregister_listener_not_registered(self): + listener = create_autospec(UListener, instance=True) + listener.on_receive = MagicMock() + + status = UClient(MockUTransport()).unregister_notification_listener(create_topic(), listener) + self.assertEqual(status.code, UCode.INVALID_ARGUMENT) + + def test_send_publish(self): + status = UClient(MockUTransport()).publish(create_topic(), None) + self.assertEqual(status.code, UCode.OK) + + def test_send_publish_with_stuffed_payload(self): + uri = UUri(authority_name="neelam") + status = UClient(MockUTransport()).publish(create_topic(), UPayload.pack_to_any(uri)) + self.assertEqual(status.code, UCode.OK) + + async def test_invoke_method_with_payload(self): + payload = UPayload.pack_to_any(UUri()) + future_result = asyncio.ensure_future( + UClient(MockUTransport()).invoke_method(create_method_uri(), payload, None) + ) + response = await future_result + self.assertIsNotNone(response) + self.assertFalse(future_result.exception()) + + async def test_invoke_method_with_payload_and_call_options(self): + payload = UPayload.pack_to_any(UUri()) + options = CallOptions(3000, "UPRIORITY_CS5") + future_result = asyncio.ensure_future( + UClient(MockUTransport()).invoke_method(create_method_uri(), payload, options) + ) + response = await future_result + self.assertIsNotNone(response) + self.assertFalse(future_result.exception()) + + async def test_invoke_method_with_null_payload(self): + future_result = asyncio.ensure_future( + UClient(MockUTransport()).invoke_method(create_method_uri(), None, CallOptions.DEFAULT) + ) + response = await future_result + self.assertIsNotNone(response) + self.assertFalse(future_result.exception()) + + async def test_invoke_method_with_timeout_transport(self): + payload = UPayload.pack_to_any(UUri()) + options = CallOptions(10, "UPRIORITY_CS5", "token") + with self.assertRaises(UStatusError) as context: + await UClient(TimeoutUTransport()).invoke_method(create_method_uri(), payload, options) + self.assertEqual(UCode.DEADLINE_EXCEEDED, context.exception.status.code) + self.assertEqual("Request timed out", context.exception.status.message) + + async def test_invoke_method_with_multi_invoke_transport(self): + rpc_client = UClient(MockUTransport()) + payload = UPayload.pack_to_any(UUri()) + + future_result1 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload, None)) + response = await future_result1 + self.assertIsNotNone(response) + future_result2 = asyncio.ensure_future(rpc_client.invoke_method(create_method_uri(), payload, None)) + response2 = await future_result2 + + self.assertIsNotNone(response2) + + self.assertFalse(future_result1.exception()) + self.assertFalse(future_result2.exception()) + + async def test_subscribe_happy_path(self): + topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) + transport = HappySubscribeUTransport() + upclient = UClient(transport) + subscription_response = await upclient.subscribe(topic, self.listener, CallOptions(timeout=5000)) + # check for successfully subscribed + self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) + + async def test_unsubscribe(self): + topic = UUri(ue_id=6, ue_version_major=1, resource_id=0x8000) + transport = HappyUnSubscribeUTransport() + upclient = UClient(transport) + status = await upclient.unsubscribe(topic, self.listener, None) + # check for successfully unsubscribed + self.assertEqual(status.code, UCode.OK) + + async def test_unregister_listener(self): + topic = create_topic() + my_listener = create_autospec(UListener, instance=True) + + subscriber = UClient(HappySubscribeUTransport()) + subscription_response = await subscriber.subscribe(topic, my_listener, CallOptions.DEFAULT) + self.assertTrue(subscription_response.status.state == SubscriptionStatus.State.SUBSCRIBED) + status = subscriber.unregister_listener(topic, my_listener) + self.assertEqual(status.code, UCode.OK) + + def test_registering_request_listener(self): + handler = create_autospec(RequestHandler, instance=True) + server = UClient(MockUTransport()) + status = server.register_request_handler(create_method_uri(), handler) + self.assertEqual(status.code, UCode.OK) + + def test_registering_twice_the_same_request_handler(self): + handler = create_autospec(RequestHandler, instance=True) + server = UClient(MockUTransport()) + status = server.register_request_handler(create_method_uri(), handler) + self.assertEqual(status.code, UCode.OK) + status = server.register_request_handler(create_method_uri(), handler) + self.assertEqual(status.code, UCode.ALREADY_EXISTS) + + def test_unregistering_non_registered_request_handler(self): + handler = create_autospec(RequestHandler, instance=True) + server = UClient(MockUTransport()) + status = server.unregister_request_handler(create_method_uri(), handler) + self.assertEqual(status.code, UCode.NOT_FOUND) + + def test_request_handler_for_notification(self): + transport = EchoUTransport() + client = UClient(transport) + handler = create_autospec(RequestHandler, instance=True) + + client.register_request_handler(create_method_uri(), handler) + self.assertEqual(client.notify(create_topic(), transport.get_source(), None), UStatus(code=UCode.OK)) + + +def create_topic(): + return UUri(authority_name="neelam", ue_id=4, ue_version_major=1, resource_id=0x8000) + + +def create_destination_uri(): + return UUri(ue_id=4, ue_version_major=1) + + +def create_method_uri(): + return UUri(authority_name="neelam", ue_id=4, ue_version_major=1, resource_id=3) + + +class HappySubscribeUTransport(MockUTransport): + def build_response(self, request): + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack( + SubscriptionResponse( + status=SubscriptionStatus( + state=SubscriptionStatus.State.SUBSCRIBED, message="Successfully Subscribed" + ) + ) + ) + ) + + +class HappyUnSubscribeUTransport(MockUTransport): + def build_response(self, request): + return UMessageBuilder.response_for_request(request.attributes).build_from_upayload( + UPayload.pack(UnsubscribeResponse()) + ) + + +class CommStatusTransport(MockUTransport): + def build_response(self, request): + status = UStatus(UCode.FAILED_PRECONDITION, "CommStatus Error") + return UMessage.response(request.attributes).with_comm_status(status.code).build(UPayload.pack(status)) diff --git a/tests/test_communication/test_upayload.py b/tests/test_communication/test_upayload.py new file mode 100644 index 0000000..6726164 --- /dev/null +++ b/tests/test_communication/test_upayload.py @@ -0,0 +1,118 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from google.protobuf import message + +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uattributes_pb2 import ( + UPayloadFormat, +) +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri + + +class TestUPayload(unittest.TestCase): + def test_is_empty_with_null_upayload(self): + self.assertTrue(UPayload.is_empty(UPayload.pack(None))) + self.assertTrue(UPayload.is_empty(UPayload.pack_to_any(None))) + + def test_is_empty_when_building_a_valid_upayload_that_data_is_empty_but_format_is_not(self): + payload = UPayload.pack(UUri()) + self.assertFalse(UPayload.is_empty(payload)) + + def test_is_empty_when_building_a_valid_upayload_where_both_data_and_format_are_not_empty(self): + uri = UUri(authority_name="Neelam") + payload = UPayload.pack_to_any(uri) + self.assertFalse(UPayload.is_empty(payload)) + + def test_is_empty_when_passing_null(self): + self.assertTrue(UPayload.is_empty(None)) + + def test_unpacking_a_upayload_calling_unpack_with_null(self): + self.assertFalse(isinstance(UPayload.unpack(None, UUri), message.Message)) + self.assertFalse(isinstance(UPayload.unpack(UPayload.pack(None), UUri), message.Message)) + + def test_unpacking_passing_a_null_bytestring(self): + self.assertFalse( + isinstance( + UPayload.unpack_data_format(None, UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF, UUri), message.Message + ) + ) + + def test_unpacking_a_google_protobuf_any_packed_upayload(self): + uri = UUri(authority_name="Neelam") + payload = UPayload.pack_to_any(uri) + unpacked = UPayload.unpack(payload, UUri) + self.assertTrue(isinstance(unpacked, message.Message)) + self.assertEqual(uri, unpacked) + + def test_unpacking_an_unsupported_format_in_upayload(self): + uri = UUri(authority_name="Neelam") + payload = UPayload.pack_from_data_and_format(uri.SerializeToString(), UPayloadFormat.UPAYLOAD_FORMAT_JSON) + unpacked = UPayload.unpack(payload, UUri) + self.assertFalse(isinstance(unpacked, message.Message)) + self.assertEqual(unpacked, None) + + def test_unpacking_to_unpack_a_message_of_the_wrong_type(self): + uri = UUri(authority_name="Neelam") + unpacked = UPayload.unpack_data_format( + uri.SerializeToString(), UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF, UMessage + ) + self.assertFalse(isinstance(unpacked, message.Message)) + self.assertEqual(unpacked, None) + + def test_equals_when_they_are_equal(self): + uri = UUri(authority_name="Neelam") + payload1 = UPayload.pack_to_any(uri) + payload2 = UPayload.pack_to_any(uri) + self.assertEqual(payload1, payload2) + + def test_equals_when_they_are_not_equal(self): + uri1 = UUri(authority_name="Neelam") + uri2 = UUri(authority_name="Neelam") + payload1 = UPayload.pack_to_any(uri1) + payload2 = UPayload.pack(uri2) + self.assertNotEqual(payload1, payload2) + + def test_equals_when_object_is_null(self): + uri = UUri(authority_name="Neelam") + payload = UPayload.pack_to_any(uri) + self.assertFalse(payload is None) + + def test_equals_when_object_is_not_an_instance_of_upayload(self): + uri = UUri(authority_name="Neelam") + payload = UPayload.pack_to_any(uri) + self.assertFalse(payload is uri) + + def test_equals_when_it_is_the_same_object(self): + uri = UUri(authority_name="Neelam") + payload = UPayload.pack_to_any(uri) + self.assertTrue(payload is payload) + + def test_equals_when_the_data_is_the_same_but_the_format_is_not(self): + uri = UUri(authority_name="Neelam") + payload1 = UPayload.pack_from_data_and_format(uri.SerializeToString(), UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF) + payload2 = UPayload.pack_from_data_and_format(uri.SerializeToString(), UPayloadFormat.UPAYLOAD_FORMAT_JSON) + self.assertNotEqual(payload1, payload2) + + def test_hash_code(self): + uri = UUri(authority_name="Neelam") + payload = UPayload.pack_to_any(uri) + self.assertEqual(payload.__hash__(), payload.__hash__()) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_communication/test_ustatuserror.py b/tests/test_communication/test_ustatuserror.py new file mode 100644 index 0000000..fad0dc5 --- /dev/null +++ b/tests/test_communication/test_ustatuserror.py @@ -0,0 +1,63 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import unittest + +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.ustatus_pb2 import UStatus + + +class TestUStatusError(unittest.TestCase): + def test_ustatus_exception_constructor(self): + """Test UStatusError constructor""" + exception = UStatusError.from_code_message(UCode.INVALID_ARGUMENT, "Invalid message type") + + self.assertEqual(UCode.INVALID_ARGUMENT, exception.get_code()) + self.assertEqual("Invalid message type", exception.get_message()) + + def test_ustatus_exception_constructor_null(self): + """Test UStatusError constructor passing null""" + exception = UStatusError(None, None) + + self.assertEqual(UCode.UNKNOWN, exception.get_code()) + self.assertEqual("", exception.get_message()) + + def test_ustatus_exception_constructor_ustatus(self): + """Test UStatusError constructor passing a UStatus""" + status = UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid message type") + exception = UStatusError(status) + + self.assertEqual(UCode.INVALID_ARGUMENT, exception.get_code()) + self.assertEqual("Invalid message type", exception.get_message()) + + def test_get_status(self): + """Test UStatusError getStatus""" + status = UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid message type") + exception = UStatusError(status) + + self.assertEqual(status, exception.get_status()) + + def test_ustatus_exception_throwable(self): + """Test UStatusError padding a throwable cause""" + cause = Exception("This is a cause") + exception = UStatusError(UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid message type"), cause) + + self.assertEqual(UCode.INVALID_ARGUMENT, exception.get_code()) + self.assertEqual("Invalid message type", exception.get_message()) + self.assertEqual(cause, exception.get_cause()) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_transport/test_builder/test_umessagebuilder.py b/tests/test_transport/test_builder/test_umessagebuilder.py index 1df670f..5b8d794 100644 --- a/tests/test_transport/test_builder/test_umessagebuilder.py +++ b/tests/test_transport/test_builder/test_umessagebuilder.py @@ -16,6 +16,7 @@ from google.protobuf.any_pb2 import Any +from uprotocol.communication.upayload import UPayload from uprotocol.transport.builder.umessagebuilder import UMessageBuilder from uprotocol.uuid.factory.uuidfactory import Factories from uprotocol.v1.uattributes_pb2 import ( @@ -41,7 +42,7 @@ def get_uuid(): return Factories.UPROTOCOL.create() -class TestUMessageBuilder(unittest.TestCase): +class TestUMessageBuilder(unittest.IsolatedAsyncioTestCase): def test_publish(self): """ Test Publish @@ -142,26 +143,12 @@ def test_build(self): self.assertEqual(UCode.CANCELLED, attributes.commstatus) self.assertEqual("myParents", attributes.traceparent) - def test_build_with_payload(self): - """ - Test Build with google.protobuf.Message payload - """ - message: UMessage = UMessageBuilder.publish(build_source()).build(build_sink()) - self.assertIsNotNone(message) - self.assertIsNotNone(message.payload) - self.assertEqual( - UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF, - message.attributes.payload_format, - ) - self.assertEqual(message.payload, build_sink().SerializeToString()) - def test_build_with_upayload(self): """ Test building UMessage with UPayload payload """ - message: UMessage = UMessageBuilder.publish(build_source()).build( - UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF, - build_sink().SerializeToString(), + message: UMessage = UMessageBuilder.publish(build_source()).build_from_upayload( + UPayload(format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF, data=build_sink().SerializeToString()) ) self.assertIsNotNone(message) self.assertIsNotNone(message.payload) @@ -175,7 +162,9 @@ def test_build_with_any_payload(self): """ Test building UMessage with Any payload """ - message: UMessage = UMessageBuilder.publish(build_source()).build(Any()) + message: UMessage = UMessageBuilder.publish(build_source()).build_from_upayload( + UPayload(format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY, data=Any().SerializeToString()) + ) self.assertIsNotNone(message) self.assertIsNotNone(message.payload) self.assertEqual( diff --git a/tests/test_transport/test_utransport.py b/tests/test_transport/test_utransport.py index d095dec..e75e623 100644 --- a/tests/test_transport/test_utransport.py +++ b/tests/test_transport/test_utransport.py @@ -13,7 +13,6 @@ """ import unittest -from typing import Optional from uprotocol.transport.ulistener import UListener from uprotocol.transport.utransport import UTransport @@ -33,11 +32,11 @@ class HappyUTransport(UTransport): def send(self, message): return UStatus(code=UCode.INVALID_ARGUMENT if message is None else UCode.OK) - def register_listener(self, source_filter: UUri, sink_filter: Optional[UUri], listener: UListener): + def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: listener.on_receive(UMessage()) return UStatus(code=UCode.OK) - def unregister_listener(self, source_filter: UUri, sink_filter: Optional[UUri], listener): + def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): return UStatus(code=UCode.OK) def get_source(self): @@ -48,18 +47,18 @@ class SadUTransport(UTransport): def send(self, message): return UStatus(code=UCode.INTERNAL) - def register_listener(self, source_filter: UUri, sink_filter: Optional[UUri], listener): + def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: listener.on_receive(None) return UStatus(code=UCode.INTERNAL) - def unregister_listener(self, source_filter: UUri, sink_filter: Optional[UUri], listener): + def unregister_listener(self, source_filter: UUri, listener, sink_filter: UUri = None): return UStatus(code=UCode.INTERNAL) def get_source(self): return UUri() -class UTransportTest(unittest.TestCase): +class UTransportTest(unittest.IsolatedAsyncioTestCase): def test_happy_send_message_parts(self): transport = HappyUTransport() status = transport.send(UMessage()) @@ -67,12 +66,12 @@ def test_happy_send_message_parts(self): def test_happy_register_listener(self): transport = HappyUTransport() - status = transport.register_listener(UUri(), None, MyListener()) + status = transport.register_listener(UUri(), MyListener(), None) self.assertEqual(status.code, UCode.OK) def test_happy_register_unlistener(self): transport = HappyUTransport() - status = transport.unregister_listener(UUri(), None, MyListener()) + status = transport.unregister_listener(UUri(), MyListener(), None) self.assertEqual(status.code, UCode.OK) def test_sending_null_message(self): @@ -87,12 +86,12 @@ def test_unhappy_send_message_parts(self): def test_unhappy_register_listener(self): transport = SadUTransport() - status = transport.register_listener(UUri(), None, MyListener()) + status = transport.register_listener(UUri(), MyListener(), None) self.assertEqual(status.code, UCode.INTERNAL) def test_unhappy_register_unlistener(self): transport = SadUTransport() - status = transport.unregister_listener(UUri(), None, MyListener()) + status = transport.unregister_listener(UUri(), MyListener(), None) self.assertEqual(status.code, UCode.INTERNAL) diff --git a/tests/test_transport/test_validator/test_uattributesvalidator.py b/tests/test_transport/test_validator/test_uattributesvalidator.py index 29d03f4..19bbe77 100644 --- a/tests/test_transport/test_validator/test_uattributesvalidator.py +++ b/tests/test_transport/test_validator/test_uattributesvalidator.py @@ -34,7 +34,7 @@ def build_topic_uuri(): return UUri(ue_id=1, ue_version_major=1, resource_id=0x8000) -class TestUAttributesValidator(unittest.TestCase): +class TestUAttributesValidator(unittest.IsolatedAsyncioTestCase): def test_uattributes_validator_happy_path(self): message = UMessageBuilder.publish(build_topic_uuri()).build() diff --git a/tests/test_uri/test_factory/test_uri_factory.py b/tests/test_uri/test_factory/test_uri_factory.py index 9f20bb2..2c7c829 100644 --- a/tests/test_uri/test_factory/test_uri_factory.py +++ b/tests/test_uri/test_factory/test_uri_factory.py @@ -18,7 +18,7 @@ from uprotocol.uri.factory.uri_factory import UriFactory -class TestUriFactory(unittest.TestCase): +class TestUriFactory(unittest.IsolatedAsyncioTestCase): def test_from_proto(self): service_descriptor = usubscription_pb2.DESCRIPTOR.services_by_name["uSubscription"] uri = UriFactory.from_proto(service_descriptor, 0, "") @@ -30,7 +30,7 @@ def test_from_proto(self): self.assertEqual(uri.authority_name, "") def test_any(self): - uri = UriFactory.any_func() + uri = UriFactory.ANY self.assertIsNotNone(uri) self.assertEqual(uri.resource_id, 65535) self.assertEqual(uri.ue_id, 65535) diff --git a/tests/test_uri/test_serializer/test_uriserializer.py b/tests/test_uri/test_serializer/test_uriserializer.py index a0d1a54..cb0adbf 100644 --- a/tests/test_uri/test_serializer/test_uriserializer.py +++ b/tests/test_uri/test_serializer/test_uriserializer.py @@ -19,7 +19,7 @@ from uprotocol.v1.uri_pb2 import UUri -class TestUriSerializer(unittest.TestCase): +class TestUriSerializer(unittest.IsolatedAsyncioTestCase): def test_using_the_serializers(self): uri = UUri( authority_name="myAuthority", @@ -44,6 +44,7 @@ def test_deserializing_a_blank_uuri(self): def test_deserializing_with_a_valid_uri_that_has_scheme(self): uri = UriSerializer.deserialize("up://myAuthority/1/2/3") + self.assertEqual(uri.authority_name, "myAuthority") self.assertEqual(uri.ue_id, 1) self.assertEqual(uri.ue_version_major, 2) @@ -55,6 +56,7 @@ def test_deserializing_with_a_valid_uri_that_has_only_scheme(self): def test_deserializing_a_valid_uuri_with_all_fields(self): uri = UriSerializer.deserialize("//myAuthority/1/2/3") + self.assertEqual(uri.authority_name, "myAuthority") self.assertEqual(uri.ue_id, 1) self.assertEqual(uri.ue_version_major, 2) @@ -62,6 +64,7 @@ def test_deserializing_a_valid_uuri_with_all_fields(self): def test_deserializing_with_only_authority(self): uri = UriSerializer.deserialize("//myAuthority") + self.assertEqual(uri.authority_name, "myAuthority") self.assertEqual(uri.ue_id, 0) self.assertEqual(uri.ue_version_major, 0) @@ -69,6 +72,7 @@ def test_deserializing_with_only_authority(self): def test_deserializing_authority_ueid(self): uri = UriSerializer.deserialize("//myAuthority/1") + self.assertEqual(uri.authority_name, "myAuthority") self.assertEqual(uri.ue_id, 1) self.assertEqual(uri.ue_version_major, 0) @@ -76,6 +80,7 @@ def test_deserializing_authority_ueid(self): def test_deserializing_authority_ueid_ueversion(self): uri = UriSerializer.deserialize("//myAuthority/1/2") + self.assertEqual(uri.authority_name, "myAuthority") self.assertEqual(uri.ue_id, 1) self.assertEqual(uri.ue_version_major, 2) @@ -99,6 +104,7 @@ def test_deserializing_with_names_instead_of_id_for_resource_id(self): def test_deserializing_a_string_without_authority(self): uri = UriSerializer.deserialize("/1/2/3") + self.assertEqual(uri.authority_name, "") self.assertEqual(uri.ue_id, 1) self.assertEqual(uri.ue_version_major, 2) @@ -106,6 +112,7 @@ def test_deserializing_a_string_without_authority(self): def test_deserializing_without_authority_and_resourceid(self): uri = UriSerializer.deserialize("/1/2") + self.assertEqual(uri.authority_name, "") self.assertEqual(uri.ue_id, 1) self.assertEqual(uri.ue_version_major, 2) @@ -113,6 +120,7 @@ def test_deserializing_without_authority_and_resourceid(self): def test_deserializing_without_authority_resourceid_version_major(self): uri = UriSerializer.deserialize("/1") + self.assertEqual(uri.authority_name, "") self.assertEqual(uri.ue_id, 1) self.assertEqual(uri.ue_version_major, 0) @@ -124,6 +132,7 @@ def test_deserializing_with_blank_authority(self): def test_deserializing_with_all_wildcard_values(self): uri = UriSerializer.deserialize("//*/FFFF/ff/ffff") + self.assertEqual(uri.authority_name, "*") self.assertEqual(uri.ue_id, 0xFFFF) self.assertEqual(uri.ue_version_major, 0xFF) @@ -155,6 +164,7 @@ def test_deserializing_with_negative_resourceid(self): def test_deserializing_with_wildcard_resourceid(self): uri = UriSerializer.deserialize("/1/2/ffff") + self.assertEqual(uri.authority_name, "") self.assertEqual(uri.ue_id, 1) self.assertEqual(uri.ue_version_major, 2) @@ -167,16 +177,19 @@ def test_serializing_an_empty_uri(self): def test_serializing_a_none_uri(self): serialized_uri = UriSerializer.serialize(None) + self.assertEqual(serialized_uri, "") def test_serializing_only_authority_ueid(self): uri = UUri(authority_name="myAuthority", ue_id=1) serialized_uri = UriSerializer.serialize(uri) + self.assertEqual(serialized_uri, "//myAuthority/1/0/0") def test_serializing_only_authority_ueid_version_major(self): uri = UUri(authority_name="myAuthority", ue_id=1, ue_version_major=2) serialized_uri = UriSerializer.serialize(uri) + self.assertEqual(serialized_uri, "//myAuthority/1/2/0") diff --git a/tests/test_uri/test_validator/test_urivalidator.py b/tests/test_uri/test_validator/test_urivalidator.py index 0a8b227..5c670ce 100644 --- a/tests/test_uri/test_validator/test_urivalidator.py +++ b/tests/test_uri/test_validator/test_urivalidator.py @@ -18,7 +18,7 @@ from uprotocol.v1.uri_pb2 import UUri -class TestUriValidator(unittest.TestCase): +class TestUriValidator(unittest.IsolatedAsyncioTestCase): def test_is_empty_with_null_uri(self): self.assertTrue(UriValidator.is_empty(None)) diff --git a/tests/test_uuid/test_factory/test_uuidfactory.py b/tests/test_uuid/test_factory/test_uuidfactory.py index da74638..9dd5cfa 100644 --- a/tests/test_uuid/test_factory/test_uuidfactory.py +++ b/tests/test_uuid/test_factory/test_uuidfactory.py @@ -21,8 +21,8 @@ from uprotocol.v1.uuid_pb2 import UUID -class TestUUIDFactory(unittest.TestCase): - def test_uuidv8_creation(self): +class TestUUIDFactory(unittest.IsolatedAsyncioTestCase): + def test_uuidv7_creation(self): now = datetime.now() uuid = Factories.UPROTOCOL.create(now) version = UUIDUtils.get_version(uuid) @@ -44,7 +44,7 @@ def test_uuidv8_creation(self): self.assertNotEqual(uuid2, UUID()) self.assertEqual(uuid, uuid2) - def test_uuidv8_creation_with_null_instant(self): + def test_uuidv7_creation_with_null_instant(self): uuid = Factories.UPROTOCOL.create(None) version = UUIDUtils.get_version(uuid) time = UUIDUtils.get_time(uuid) @@ -63,22 +63,6 @@ def test_uuidv8_creation_with_null_instant(self): self.assertNotEqual(uuid2, UUID()) self.assertEqual(uuid, uuid2) - def test_uuidv8_overflow(self): - uuid_list = [] - max_count = 4095 - - now = datetime.now() - for i in range(max_count * 2): - uuid_list.append(Factories.UPROTOCOL.create(now)) - - self.assertEqual( - UUIDUtils.get_time(uuid_list[0]), - UUIDUtils.get_time(uuid_list[i]), - ) - self.assertEqual(uuid_list[0].lsb, uuid_list[i].lsb) - if i > max_count: - self.assertEqual(uuid_list[max_count].msb, uuid_list[i].msb) - def test_uuidv6_creation_with_instant(self): now = datetime.now() uuid = Factories.UUIDV6.create(now) @@ -209,18 +193,24 @@ def test_create_uprotocol_uuid_with_different_time_values(self): self.assertTrue(time1 is not None) self.assertNotEqual(time, time1) - def test_create_both_uuidv6_and_v8_to_compare_performance(self): + def test_create_both_uuidv6_and_v7_to_compare_performance(self): uuidv6_list = [] - uuidv8_list = [] + uuidv7_list = [] max_count = 10000 for _ in range(max_count): - uuidv8_list.append(Factories.UPROTOCOL.create()) + uuidv7_list.append(Factories.UPROTOCOL.create()) for _ in range(max_count): uuidv6_list.append(Factories.UUIDV6.create()) - # print( - # f"UUIDv8: [{v8_diff.total_seconds() / max_count}s] UUIDv6: [{v6_diff.total_seconds() / max_count}s]") + + def test_create_uuidv7_with_the_same_time_to_confirm_the_uuids_are_not_the_same(self): + now = datetime.now(timezone.utc) + factory = Factories.UPROTOCOL + uuid = factory.create(now) + uuid1 = factory.create(now) + self.assertNotEqual(uuid, uuid1) + self.assertEqual(UUIDUtils.get_time(uuid), UUIDUtils.get_time(uuid1)) if __name__ == "__main__": diff --git a/tests/test_uuid/test_factory/test_uuidutils.py b/tests/test_uuid/test_factory/test_uuidutils.py index 8fc553b..e35bc46 100644 --- a/tests/test_uuid/test_factory/test_uuidutils.py +++ b/tests/test_uuid/test_factory/test_uuidutils.py @@ -31,7 +31,7 @@ def create_id(): TTL = 10000 -class TestUUIDUtils(unittest.TestCase): +class TestUUIDUtils(unittest.IsolatedAsyncioTestCase): def test_get_elapsed_time(self): id_val: UUID = create_id() self.assertIsNotNone(UUIDUtils.get_elapsed_time(id_val)) diff --git a/tests/test_uuid/test_validator/test_uuidvalidator.py b/tests/test_uuid/test_validator/test_uuidvalidator.py index edf1ba5..09157a3 100644 --- a/tests/test_uuid/test_validator/test_uuidvalidator.py +++ b/tests/test_uuid/test_validator/test_uuidvalidator.py @@ -24,7 +24,7 @@ from uprotocol.validation.validationresult import ValidationResult -class TestUuidValidator(unittest.TestCase): +class TestUuidValidator(unittest.IsolatedAsyncioTestCase): def test_validator_with_good_uuid(self): uuid = Factories.UPROTOCOL.create() status = UuidValidator.get_validator(uuid).validate(uuid) @@ -51,14 +51,14 @@ def test_invalid_time_uuid(self): self.assertEqual(UCode.INVALID_ARGUMENT, status.code) self.assertEqual("Invalid UUID Time", status.message) - def test_uuidv8_with_invalid_uuids(self): + def test_uuidv7_with_invalid_uuids(self): validator = Validators.UPROTOCOL.validator() self.assertIsNotNone(validator) status = validator.validate(None) self.assertEqual(UCode.INVALID_ARGUMENT, status.code) - self.assertEqual("Invalid UUIDv8 Version,Invalid UUID Time", status.message) + self.assertEqual("Invalid UUIDv7 Version,Invalid UUID Time", status.message) - def test_uuidv8_with_invalid_types(self): + def test_uuidv7_with_invalid_types(self): uuidv6 = Factories.UUIDV6.create() uuid = UUID(msb=0, lsb=0) uuidv4 = UuidSerializer.deserialize("195f9bd1-526d-4c28-91b1-ff34c8e3632d") @@ -68,15 +68,15 @@ def test_uuidv8_with_invalid_types(self): status = validator.validate(uuidv6) self.assertEqual(UCode.INVALID_ARGUMENT, status.code) - self.assertEqual("Invalid UUIDv8 Version", status.message) + self.assertEqual("Invalid UUIDv7 Version", status.message) status1 = validator.validate(uuid) self.assertEqual(UCode.INVALID_ARGUMENT, status1.code) - self.assertEqual("Invalid UUIDv8 Version,Invalid UUID Time", status1.message) + self.assertEqual("Invalid UUIDv7 Version,Invalid UUID Time", status1.message) status2 = validator.validate(uuidv4) self.assertEqual(UCode.INVALID_ARGUMENT, status2.code) - self.assertEqual("Invalid UUIDv8 Version,Invalid UUID Time", status2.message) + self.assertEqual("Invalid UUIDv7 Version,Invalid UUID Time", status2.message) def test_good_uuidv6(self): uuid = Factories.UUIDV6.create() @@ -107,7 +107,7 @@ def test_uuidv6_with_null_uuid(self): ) self.assertEqual(UCode.INVALID_ARGUMENT, status.code) - def test_uuidv6_with_uuidv8(self): + def test_uuidv6_with_uuidv7(self): uuid = Factories.UPROTOCOL.create() validator = Validators.UUIDV6.validator() self.assertIsNotNone(validator) diff --git a/uprotocol/communication/__init__.py b/uprotocol/communication/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/uprotocol/communication/calloptions.py b/uprotocol/communication/calloptions.py new file mode 100644 index 0000000..62f538d --- /dev/null +++ b/uprotocol/communication/calloptions.py @@ -0,0 +1,37 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from dataclasses import dataclass, field + +from uprotocol.v1.uattributes_pb2 import UPriority + + +@dataclass(frozen=True) +class CallOptions: + DEFAULT = None + timeout: int = field(default=10000) + priority: UPriority = field(default=UPriority.UPRIORITY_CS4) + token: str = field(default="") + + def __post_init__(self): + if self.timeout is None: + raise ValueError("timeout cannot be None") + if self.priority is None: + raise ValueError("priority cannot be None") + if self.token is None: + raise ValueError("token cannot be None") + + +# Default instance +CallOptions.DEFAULT = CallOptions() diff --git a/uprotocol/communication/inmemoryrpcclient.py b/uprotocol/communication/inmemoryrpcclient.py new file mode 100644 index 0000000..eba8cd2 --- /dev/null +++ b/uprotocol/communication/inmemoryrpcclient.py @@ -0,0 +1,158 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio +from typing import Dict, Optional + +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.rpcclient import RpcClient +from uprotocol.communication.upayload import UPayload +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport +from uprotocol.uri.factory.uri_factory import UriFactory +from uprotocol.uuid.serializer.uuidserializer import UuidSerializer +from uprotocol.v1.uattributes_pb2 import UMessageType +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri + + +class HandleResponsesListener(UListener): + def __init__(self, requests): + self.requests = requests + + def on_receive(self, umsg: UMessage) -> None: + """ + Handle the responses coming back from the server asynchronously. + + Args: + - response (UMessage): The response message from the server. + """ + if umsg.attributes.type != UMessageType.UMESSAGE_TYPE_RESPONSE: + return + + response_attributes = umsg.attributes + future = self.requests.pop(UuidSerializer.serialize(response_attributes.reqid), None) + + if not future: + return + + if response_attributes.commstatus: + code = response_attributes.commstatus + future.set_exception( + UStatusError.from_code_message(code=code, message=f"Communication error [{UCode.Name(code)}]") + ) + return + + future.set_result(umsg) + + +class InMemoryRpcClient(RpcClient): + """ + An example implementation of the RpcClient interface that + wraps the UTransport for implementing the RPC pattern to send + RPC requests and receive RPC responses. This implementation + uses an in-memory map to store futures that need to be + completed when the response comes in from the server. + + NOTE: Developers are not required to use these APIs; they can + implement their own or directly use the UTransport to send RPC + requests and register listeners that handle the RPC responses. + """ + + def __init__(self, transport: UTransport): + """ + Constructor for the InMemoryRpcClient. + + :param transport: The transport to use for sending the RPC requests. + """ + if not transport: + raise ValueError(UTransport.TRANSPORT_NULL_ERROR) + elif not isinstance(transport, UTransport): + raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + self.transport = transport + self.requests: Dict[str, asyncio.Future] = {} + self.response_handler: UListener = HandleResponsesListener(self.requests) + + status = self.transport.register_listener(UriFactory.ANY, self.response_handler, None) + if status.code != UCode.OK: + raise UStatusError.from_code_message(status.code, "Failed to register listener") + + def cleanup_request(self, request_id): + request_id = UuidSerializer.serialize(request_id) + if request_id in self.requests: + del self.requests[request_id] + + async def invoke_method( + self, method_uri: UUri, request_payload: UPayload, options: Optional[CallOptions] = None + ) -> UPayload: + """ + Invoke a method (send an RPC request) and receive the response asynchronously. + + :param method_uri: The method URI to be invoked. + :param request_payload: The request message to be sent to the server. + :param options: RPC method invocation call options. Defaults to None. + :return: Returns the asyncio Future with the response payload or raises an exception + with the failure reason as UStatus. + """ + options = options or CallOptions.DEFAULT + builder = UMessageBuilder.request(self.transport.get_source(), method_uri, options.timeout) + request = None + response_future = asyncio.Future() + try: + if options.token: + builder.with_token(options.token) + + request = builder.build_from_upayload(request_payload) + + response_future.add_done_callback(lambda fut: self.cleanup_request(request.attributes.id)) + + if UuidSerializer.serialize(request.attributes.id) in self.requests: + raise UStatusError.from_code_message(code=UCode.ALREADY_EXISTS, message="Duplicated request found") + self.requests[UuidSerializer.serialize(request.attributes.id)] = response_future + + async def wait_for_response(): + try: + response_message = await asyncio.wait_for(response_future, timeout=request.attributes.ttl / 1000) + return UPayload.pack_from_data_and_format( + response_message.payload, response_message.attributes.payload_format + ) + except asyncio.TimeoutError: + raise UStatusError.from_code_message(code=UCode.DEADLINE_EXCEEDED, message="Request timed out") + except UStatusError as e: + raise e + except Exception as e: + raise UStatusError.from_code_message(code=UCode.UNKNOWN, message=str(e)) + + # Start the task for waiting for the response before sending the request + response_task = asyncio.create_task(wait_for_response()) + + status = self.transport.send(request) + + if status.code != UCode.OK: + raise UStatusError(status) + # Wait for the response task to complete + return await response_task + + except Exception as e: + raise e + + def close(self): + """ + Close the InMemoryRpcClient by clearing stored requests and unregistering the listener. + """ + self.requests.clear() + self.transport.unregister_listener(UriFactory.ANY, self.response_handler, self.transport.get_source()) diff --git a/uprotocol/communication/inmemoryrpcserver.py b/uprotocol/communication/inmemoryrpcserver.py new file mode 100644 index 0000000..bf36296 --- /dev/null +++ b/uprotocol/communication/inmemoryrpcserver.py @@ -0,0 +1,130 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from uprotocol.communication.requesthandler import RequestHandler +from uprotocol.communication.rpcserver import RpcServer +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport +from uprotocol.uri.factory.uri_factory import UriFactory +from uprotocol.uri.serializer.uriserializer import UriSerializer +from uprotocol.v1.uattributes_pb2 import ( + UMessageType, +) +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.umessage_pb2 import UMessage +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class HandleRequestListener(UListener): + def __init__(self, transport: UTransport, request_handlers): + self.transport = transport + self.request_handlers = request_handlers + + def on_receive(self, request: UMessage) -> None: + """ + Generic incoming handler to process RPC requests from clients. + + :param request: The request message from clients. + """ + # Only handle request messages, ignore all other messages like notifications + if request.attributes.type != UMessageType.UMESSAGE_TYPE_REQUEST: + return + + request_attributes = request.attributes + + # Check if the request is for one that we have registered a handler for, if not ignore it + handler = self.request_handlers.get(UriSerializer().serialize(request_attributes.sink)) + if handler is None: + return + + response_builder = UMessageBuilder.response_for_request(request_attributes) + + try: + response_payload = handler.handle_request(request) + except Exception as e: + code = UCode.INTERNAL + response_payload = None + if isinstance(e, UStatusError): + code = e.get_code() + response_builder.with_commstatus(code) + + self.transport.send(response_builder.build_from_upayload(response_payload)) + + +class InMemoryRpcServer(RpcServer): + def __init__(self, transport): + if not transport: + raise ValueError(UTransport.TRANSPORT_NULL_ERROR) + elif not isinstance(transport, UTransport): + raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + self.transport = transport + self.request_handlers = {} + self.request_handler = HandleRequestListener(self.transport, self.request_handlers) + + def register_request_handler(self, method_uri: UUri, handler: RequestHandler) -> UStatus: + """ + Register a handler that will be invoked when requests come in from clients for the given method. + + Note: Only one handler is allowed to be registered per method URI. + + :param method_uri: The URI for the method to register the listener for. + :param handler: The handler that will process the request for the client. + :return: Returns the status of registering the RpcListener. + """ + if method_uri is None: + raise ValueError("Method URI missing") + if handler is None: + raise ValueError("Request listener missing") + + try: + method_uri_str = UriSerializer().serialize(method_uri) + if method_uri_str in self.request_handlers: + current_handler = self.request_handlers[method_uri_str] + if current_handler is not None: + raise UStatusError.from_code_message(UCode.ALREADY_EXISTS, "Handler already registered") + + result = self.transport.register_listener(UriFactory.ANY, self.request_handler, method_uri) + if result.code != UCode.OK: + raise UStatusError.from_code_message(result.code, result.message) + + self.request_handlers[method_uri_str] = handler + return UStatus(code=UCode.OK) + + except UStatusError as e: + return UStatus(code=e.get_code(), message=e.get_message()) + except Exception as e: + return UStatus(code=UCode.INTERNAL, message=str(e)) + + def unregister_request_handler(self, method_uri: UUri, handler: RequestHandler) -> UStatus: + """ + Unregister a handler that will be invoked when requests come in from clients for the given method. + + :param method_uri: The resolved UUri where the listener was registered to receive messages from. + :param handler: The handler for processing requests. + :return: Returns the status of unregistering the RpcListener. + """ + if method_uri is None: + raise ValueError("Method URI missing") + if handler is None: + raise ValueError("Request listener missing") + method_uri_str = UriSerializer().serialize(method_uri) + + if self.request_handlers.get(method_uri_str) == handler: + del self.request_handlers[method_uri_str] + return self.transport.unregister_listener(UriFactory.ANY, self.request_handler, method_uri) + + return UStatus(code=UCode.NOT_FOUND) diff --git a/uprotocol/communication/inmemorysubscriber.py b/uprotocol/communication/inmemorysubscriber.py new file mode 100644 index 0000000..0f68c08 --- /dev/null +++ b/uprotocol/communication/inmemorysubscriber.py @@ -0,0 +1,146 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio + +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.rpcclient import RpcClient +from uprotocol.communication.rpcmapper import RpcMapper +from uprotocol.communication.subscriber import Subscriber +from uprotocol.communication.upayload import UPayload +from uprotocol.core.usubscription.v3 import usubscription_pb2 +from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + SubscriberInfo, + SubscriptionRequest, + SubscriptionResponse, + UnsubscribeRequest, + UnsubscribeResponse, +) +from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport +from uprotocol.uri.factory.uri_factory import UriFactory +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class InMemorySubscriber(Subscriber): + """ + The following is an example implementation of the Subscriber interface that + wraps the UTransport for implementing the Subscriber-side of the pub/sub + messaging pattern to allow developers to subscribe and unsubscribe to topics. + This implementation uses the InMemoryRpcClient to send the subscription request + to the uSubscription service. + + NOTE: Developers are not required to use these APIs, they can implement their own + or directly use the UTransport to communicate with the uSubscription + services and register their publish message listener. + """ + + METHOD_SUBSCRIBE = 1 # TODO: Fetch this from proto generated code + METHOD_UNSUBSCRIBE = 2 # TODO: Fetch this from proto generated code + + def __init__(self, transport: UTransport, rpc_client: RpcClient): + """ + Constructor for the DefaultSubscriber. + + :param transport: The transport to use for sending the notifications + :param rpc_client: The RPC client to use for sending the RPC requests + """ + if not transport: + raise ValueError(UTransport.TRANSPORT_NULL_ERROR) + elif not isinstance(transport, UTransport): + raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + elif not rpc_client: + raise ValueError("RpcClient missing") + self.transport = transport + self.rpc_client = rpc_client + + async def subscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> SubscriptionResponse: + """ + Subscribe to a given topic. + + The API will return a future with the response SubscriptionResponse or exception + with the failure if the subscription was not successful. The API will also register the listener to be + called when messages are received. + + :param topic: The topic to subscribe to. + :param listener: The listener to be called when a message is received on the topic. + :param options: The call options for the subscription. + :return: Returns the future with the response SubscriptionResponse or + exception with the failure reason as UStatus. + """ + if not topic: + raise ValueError("Subscribe topic missing") + if not listener: + raise ValueError("Request listener missing") + service_descriptor = usubscription_pb2.DESCRIPTOR.services_by_name["uSubscription"] + + subscribe_uri = UriFactory.from_proto(service_descriptor, self.METHOD_SUBSCRIBE, None) + request = SubscriptionRequest(topic=topic, subscriber=SubscriberInfo(uri=self.transport.get_source())) + future_result = asyncio.ensure_future( + self.rpc_client.invoke_method(subscribe_uri, UPayload.pack(request), options) + ) + + response_future = RpcMapper.map_response(future_result, SubscriptionResponse) + + response = await response_future + self.transport.register_listener(topic, listener) + return response + + async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> UStatus: + """ + Unsubscribe to a given topic. + + The subscriber no longer wishes to be subscribed to said topic so we issue an unsubscribe + request to the USubscription service. + + :param topic: The topic to unsubscribe to. + :param listener: The listener to be called when a message is received on the topic. + :param options: The call options for the subscription. + :return: Returns UStatus with the result from the unsubscribe request. + """ + if not topic: + raise ValueError("Unsubscribe topic missing") + if not listener: + raise ValueError("Listener missing") + service_descriptor = usubscription_pb2.DESCRIPTOR.services_by_name["uSubscription"] + unsubscribe_uri = UriFactory.from_proto(service_descriptor, self.METHOD_UNSUBSCRIBE, None) + unsubscribe_request = UnsubscribeRequest(topic=topic) + future_result = asyncio.ensure_future( + self.rpc_client.invoke_method(unsubscribe_uri, UPayload.pack(unsubscribe_request), options) + ) + response_future = RpcMapper.map_response_to_result(future_result, UnsubscribeResponse) + response = await response_future + if response.is_success(): + self.transport.unregister_listener(topic, listener) + return UStatus(code=UCode.OK) + return response.failure_value() + + def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Unregister a listener from a topic. + + This method will only unregister the listener for a given subscription thus allowing a uE to stay + subscribed even if the listener is removed. + + :param topic: The topic to subscribe to. + :param listener: The listener to be called when a message is received on the topic. + :return: Returns UStatus with the status of the listener unregister request. + """ + if not topic: + raise ValueError("Unsubscribe topic missing") + if not listener: + raise ValueError("Request listener missing") + return self.transport.unregister_listener(topic, listener) diff --git a/uprotocol/communication/notifier.py b/uprotocol/communication/notifier.py new file mode 100644 index 0000000..f4c2921 --- /dev/null +++ b/uprotocol/communication/notifier.py @@ -0,0 +1,63 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod + +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class Notifier(ABC): + """ + Communication Layer (uP-L2) Notifier Interface. + + Notifier is an interface that provides the APIs to send notifications (to a client) or + register/unregister listeners to receive the notifications. + """ + + @abstractmethod + def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus: + """ + Send a notification to a given topic passing a payload. + + :param topic: The topic to send the notification to. + :param destination: The destination to send the notification to. + :param payload: The payload to send with the notification. + :return: Returns the UStatus with the status of the notification. + """ + pass + + @abstractmethod + def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Register a listener for a notification topic. + + :param topic: The topic to register the listener to. + :param listener: The listener to be called when a message is received on the topic. + :return: Returns the UStatus with the status of the listener registration. + """ + pass + + @abstractmethod + def unregister_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Unregister a listener from a notification topic. + + :param topic: The topic to unregister the listener from. + :param listener: The listener to be unregistered from the topic. + :return: Returns the UStatus with the status of the listener that was unregistered. + """ + pass diff --git a/uprotocol/communication/publisher.py b/uprotocol/communication/publisher.py new file mode 100644 index 0000000..e583367 --- /dev/null +++ b/uprotocol/communication/publisher.py @@ -0,0 +1,41 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod + +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class Publisher(ABC): + """ + uP-L2 interface and data models for Python. + + uP-L1 interfaces implement the core uProtocol across various communication middlewares + and programming languages while uP-L2 API are the client-facing APIs that wrap the transport + functionality into easy-to-use, language-specific APIs to do the most common functionality + of the protocol (subscribe, publish, notify, invoke a method, or handle RPC requests). + """ + + @abstractmethod + def publish(self, topic: UUri, payload: UPayload) -> UStatus: + """ + Publish a message to a topic passing UPayload as the payload. + + :param topic: The topic to publish to. + :param payload: The UPayload to publish. + :return: UStatus + """ + pass diff --git a/uprotocol/communication/requesthandler.py b/uprotocol/communication/requesthandler.py new file mode 100644 index 0000000..ae980e3 --- /dev/null +++ b/uprotocol/communication/requesthandler.py @@ -0,0 +1,39 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod + +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.umessage_pb2 import UMessage + + +class RequestHandler(ABC): + """ + RequestHandler is used by the RpcServer to handle incoming requests and automatically sends + back the response to the client. + + The service must implement the `handle_request` method to handle the request and then return + the response payload. + """ + + @abstractmethod + def handle_request(self, message: UMessage) -> UPayload: + """ + Method called to handle/process request messages. + + :param message: The request message received. + :return: The response payload. + :raises UStatusError: If the service encounters an error processing the request. + """ + pass diff --git a/uprotocol/communication/rpcclient.py b/uprotocol/communication/rpcclient.py new file mode 100644 index 0000000..16bdc17 --- /dev/null +++ b/uprotocol/communication/rpcclient.py @@ -0,0 +1,40 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod + +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uri_pb2 import UUri + + +class RpcClient(ABC): + """ + Communication Layer (uP-L2) RPC Client Interface. + + Clients use this API to invoke a method (send a request and wait for a reply). + """ + + @abstractmethod + async def invoke_method(self, method_uri: UUri, request_payload: UPayload, options: CallOptions) -> UPayload: + """ + API for clients to invoke a method (send an RPC request) and receive the response. + + :param method_uri: The method URI to be invoked. + :param request_payload: The request payload to be sent to the server. + :param options: RPC method invocation call options. + :return: Returns the response payload. + :raises UStatus: If the RPC invocation fails for any reason. + """ + pass diff --git a/uprotocol/communication/rpcmapper.py b/uprotocol/communication/rpcmapper.py new file mode 100644 index 0000000..281d840 --- /dev/null +++ b/uprotocol/communication/rpcmapper.py @@ -0,0 +1,90 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import asyncio + +from uprotocol.communication.rpcresult import RpcResult +from uprotocol.communication.upayload import UPayload +from uprotocol.communication.ustatuserror import UStatusError +from uprotocol.v1.ucode_pb2 import UCode + + +class RpcMapper: + """ + RPC Wrapper is a class that provides static methods to wrap an RPC request + with an RPC Response (uP-L2). APIs returning Message assume the message is + protobuf serialized com.google.protobuf.Any (UMessageFormat.PROTOBUF), and will + raise an error if anything else is passed. + """ + + @staticmethod + async def map_response(response_coro: asyncio.Future, expected_cls): + """ + Map a response from invoking a method on a uTransport service into a result + containing the declared expected return type of the RPC method. + + :param response_coro: Coroutine response from uTransport. + :param expected_cls: The class name of the declared expected return type of the RPC method. + :return: Returns the declared expected return type of the RPC method or raises an exception. + """ + try: + payload = await response_coro + except Exception as e: + raise RuntimeError(f"Unexpected exception: {str(e)}") from e + + if payload is not None: + if not payload.data: + return expected_cls() + else: + result = UPayload.unpack(payload, expected_cls) + if result: + return result + + raise RuntimeError(f"Unknown payload. Expected [{expected_cls.__name__}]") + + @staticmethod + async def map_response_to_result(response_coro: asyncio.Future, expected_cls) -> RpcResult: + """ + Map a response from method invocation to an RpcResult containing the declared expected + return type of the RPC method. + + This function handles the asynchronous response from invoking a method on a uTransport + service. It converts the response into a result containing the expected return type or + an error status. + + :param response_coro: An asyncio.Future representing the asynchronous response from uTransport. + :param expected_cls: The class of the expected return type of the RPC method. + :return: Returns an RpcResult containing the expected return type T, or an error status. + :rtype: RpcResult[T] + :raises: Raises appropriate exceptions if there is an error during response handling. + """ + try: + payload = await response_coro + except Exception as e: + if isinstance(e, UStatusError): + return RpcResult.failure(value=e.status) + elif isinstance(e, asyncio.TimeoutError): + return RpcResult.failure(code=UCode.DEADLINE_EXCEEDED, message="Request timed out") + else: + return RpcResult.failure(code=UCode.INVALID_ARGUMENT, message=str(e)) + + if payload is not None: + if not payload.data: + return RpcResult.success(expected_cls()) + else: + result = UPayload.unpack(payload, expected_cls) + return RpcResult.success(result) + + exception = RuntimeError(f"Unknown or null payload type. Expected [{expected_cls.__name__}]") + return RpcResult.failure(code=UCode.INVALID_ARGUMENT, message=str(exception)) diff --git a/uprotocol/communication/rpcresult.py b/uprotocol/communication/rpcresult.py new file mode 100644 index 0000000..2746d46 --- /dev/null +++ b/uprotocol/communication/rpcresult.py @@ -0,0 +1,112 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod +from typing import TypeVar, Union + +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.ustatus_pb2 import UStatus + +T = TypeVar("T") + + +class RpcResult(ABC): + """ + Wrapper class for RPC Stub calls. It contains a Success with the type of the RPC call, or a failure with the + UStatus returned by the failed call. + """ + + @abstractmethod + def is_success(self) -> bool: + pass + + @abstractmethod + def is_failure(self) -> bool: + pass + + @abstractmethod + def failure_value(self) -> UStatus: + pass + + @abstractmethod + def success_value(self) -> T: + pass + + @staticmethod + def success(value: T) -> "RpcResult": + return Success(value) + + @staticmethod + def failure( + value: Union[ + UStatus, + "Failure", + Exception, + ] = None, + code: UCode = UCode.UNKNOWN, + message: str = "", + ) -> "RpcResult": + return Failure(value, code, message) + + +class Success(RpcResult): + def __init__(self, value: T): + self.value = value + + def is_success(self) -> bool: + return True + + def is_failure(self) -> bool: + return False + + def failure_value(self) -> UStatus: + raise ValueError("Method failure_value() called on a Success instance") + + def success_value(self) -> T: + return self.value + + def __str__(self) -> str: + return f"Success({self.success_value()})" + + +class Failure(RpcResult): + def __init__( + self, + value: Union[UStatus, "Failure", Exception, None] = None, + code: UCode = UCode.UNKNOWN, + message: str = "", + ): + if isinstance(value, UStatus): + self.value = value + elif isinstance(value, Exception): + self.value = UStatus(code=code, message=str(value)) + elif isinstance(value, Failure): + self.value = value.failure_value() + else: + self.value = UStatus(code=code, message=message) + + def is_success(self) -> bool: + return False + + def is_failure(self) -> bool: + return True + + def failure_value(self) -> UStatus: + return self.value + + def success_value(self) -> T: + raise ValueError("Method success_value() called on a Failure instance") + + def __str__(self) -> str: + return f"Failure({self.value})" diff --git a/uprotocol/communication/rpcserver.py b/uprotocol/communication/rpcserver.py new file mode 100644 index 0000000..b50f1d9 --- /dev/null +++ b/uprotocol/communication/rpcserver.py @@ -0,0 +1,52 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod + +from uprotocol.communication.requesthandler import RequestHandler +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class RpcServer(ABC): + """ + Communication Layer (uP-L2) Rpc Server interface. + + This interface provides APIs that services can call to register handlers for + incoming requests for given methods. + """ + + @abstractmethod + def register_request_handler(self, method: UUri, handler: RequestHandler) -> UStatus: + """ + Register a handler that will be invoked when requests come in from clients for the given method. + + Note: Only one handler is allowed to be registered per method URI. + + :param method: Uri for the method to register the listener for. + :param handler: The handler that will process the request for the client. + :return: Returns the status of registering the RpcListener. + """ + pass + + @abstractmethod + def unregister_request_handler(self, method: UUri, handler: RequestHandler) -> UStatus: + """ + Unregister a handler that will be invoked when requests come in from clients for the given method. + + :param method: Resolved UUri for where the listener was registered to receive messages from. + :param handler: The handler for processing requests. + :return: Returns status of unregistering the RpcListener. + """ + pass diff --git a/uprotocol/communication/simplenotifier.py b/uprotocol/communication/simplenotifier.py new file mode 100644 index 0000000..0add6f5 --- /dev/null +++ b/uprotocol/communication/simplenotifier.py @@ -0,0 +1,78 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from typing import Optional + +from uprotocol.communication.notifier import Notifier +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class SimpleNotifier(Notifier): + """ + The following is an example implementation of the Notifier interface that + wraps the UTransport for implementing the notification pattern to send + notifications and register to receive notification events. + + *NOTE:* Developers are not required to use these APIs, they can implement their own + or directly use the UTransport to send notifications and register listeners. + """ + + def __init__(self, transport: UTransport): + """ + Constructor for the DefaultNotifier. + + :param transport: the transport to use for sending the notifications + """ + if transport is None: + raise ValueError(UTransport.TRANSPORT_NULL_ERROR) + elif not isinstance(transport, UTransport): + raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + self.transport = transport + + def notify(self, topic: UUri, destination: UUri, payload: Optional[UPayload] = None) -> UStatus: + """ + Send a notification to a given topic. + + :param topic: The topic to send the notification to. + :param destination: The destination to send the notification to. + :param payload: The payload to send with the notification. + :return: Returns the UStatus with the status of the notification. + """ + builder = UMessageBuilder.notification(topic, destination) + return self.transport.send(builder.build() if payload is None else builder.build_from_upayload(payload)) + + def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Register a listener for a notification topic. + + :param topic: The topic to register the listener to. + :param listener: The listener to be called when a message is received on the topic. + :return: Returns the UStatus with the status of the listener registration. + """ + return self.transport.register_listener(topic, listener, self.transport.get_source()) + + def unregister_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Unregister a listener from a notification topic. + + :param topic: The topic to unregister the listener from. + :param listener: The listener to be unregistered from the topic. + :return: Returns the UStatus with the status of the listener that was unregistered. + """ + return self.transport.unregister_listener(topic, listener, self.transport.get_source()) diff --git a/uprotocol/communication/simplepublisher.py b/uprotocol/communication/simplepublisher.py new file mode 100644 index 0000000..01d63ee --- /dev/null +++ b/uprotocol/communication/simplepublisher.py @@ -0,0 +1,48 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from uprotocol.communication.publisher import Publisher +from uprotocol.communication.upayload import UPayload +from uprotocol.transport.builder.umessagebuilder import UMessageBuilder +from uprotocol.transport.utransport import UTransport +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class SimplePublisher(Publisher): + def __init__(self, transport: UTransport): + """ + Constructor for SimplePublisher. + + :param transport: The transport instance to use for sending notifications. + """ + if transport is None: + raise ValueError(UTransport.TRANSPORT_NULL_ERROR) + elif not isinstance(transport, UTransport): + raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + self.transport = transport + + def publish(self, topic: UUri, payload: UPayload) -> UStatus: + """ + Publishes a message to a topic using the provided payload. + + :param topic: The topic to publish the message to. + :param payload: The payload to be published. + :return: An instance of UStatus indicating the status of the publish operation. + """ + if topic is None: + raise ValueError("Publish topic missing") + + message = UMessageBuilder.publish(topic).build_from_upayload(payload) + return self.transport.send(message) diff --git a/uprotocol/communication/subscriber.py b/uprotocol/communication/subscriber.py new file mode 100644 index 0000000..5f0040d --- /dev/null +++ b/uprotocol/communication/subscriber.py @@ -0,0 +1,66 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from abc import ABC, abstractmethod + +from uprotocol.communication.calloptions import CallOptions +from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + SubscriptionResponse, +) +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class Subscriber(ABC): + """ + Communication Layer (uP-L2) Subscriber interface. + + This interface provides APIs to subscribe and unsubscribe to a given topic. + """ + + @abstractmethod + async def subscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> SubscriptionResponse: + """ + Subscribe to a given topic asynchronously. + + :param topic: The topic to subscribe to. + :param listener: The listener to be called when a message is received on the topic. + :param options: The call options for the subscription. + :return: Returns the SubscriptionResponse upon successful subscription + """ + pass + + @abstractmethod + async def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> UStatus: + """ + Unsubscribe to a given topic asynchronously. + + :param topic: The topic to unsubscribe to. + :param listener: The listener to be called when a message is received on the topic. + :param options: The call options for the subscription. + :return: Returns UStatus with the result from the unsubscribe request. + """ + pass + + @abstractmethod + async def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: + """ + Unregister a listener from a topic asynchronously. + + :param topic: The topic to subscribe to. + :param listener: The listener to be called when a message is received on the topic. + :return: Returns UStatus with the status of the listener unregister request. + """ + pass diff --git a/uprotocol/communication/uclient.py b/uprotocol/communication/uclient.py new file mode 100644 index 0000000..ac20926 --- /dev/null +++ b/uprotocol/communication/uclient.py @@ -0,0 +1,82 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from typing import Optional + +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient +from uprotocol.communication.inmemoryrpcserver import InMemoryRpcServer +from uprotocol.communication.inmemorysubscriber import InMemorySubscriber +from uprotocol.communication.notifier import Notifier +from uprotocol.communication.publisher import Publisher +from uprotocol.communication.rpcclient import RpcClient +from uprotocol.communication.rpcserver import RpcServer +from uprotocol.communication.simplenotifier import SimpleNotifier +from uprotocol.communication.simplepublisher import SimplePublisher +from uprotocol.communication.subscriber import Subscriber +from uprotocol.communication.upayload import UPayload +from uprotocol.core.usubscription.v3.usubscription_pb2 import ( + SubscriptionResponse, +) +from uprotocol.transport.ulistener import UListener +from uprotocol.transport.utransport import UTransport +from uprotocol.v1.uri_pb2 import UUri +from uprotocol.v1.ustatus_pb2 import UStatus + + +class UClient(RpcServer, Subscriber, Notifier, Publisher, RpcClient): + def __init__(self, transport: UTransport): + self.transport = transport + if transport is None: + raise ValueError(UTransport.TRANSPORT_NULL_ERROR) + elif not isinstance(transport, UTransport): + raise ValueError(UTransport.TRANSPORT_NOT_INSTANCE_ERROR) + + self.rpcServer = InMemoryRpcServer(self.transport) + self.publisher = SimplePublisher(self.transport) + self.notifier = SimpleNotifier(self.transport) + self.rpcClient = InMemoryRpcClient(self.transport) + self.subscriber = InMemorySubscriber(self.transport, self.rpcClient) + + async def subscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> SubscriptionResponse: + return await self.subscriber.subscribe(topic, listener, options) + + def unsubscribe(self, topic: UUri, listener: UListener, options: CallOptions) -> UStatus: + return self.subscriber.unsubscribe(topic, listener, options) + + def unregister_listener(self, topic: UUri, listener: UListener) -> UStatus: + return self.subscriber.unregister_listener(topic, listener) + + def notify(self, topic: UUri, destination: UUri, payload: UPayload) -> UStatus: + return self.notifier.notify(topic, destination, payload) + + def register_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + return self.notifier.register_notification_listener(topic, listener) + + def unregister_notification_listener(self, topic: UUri, listener: UListener) -> UStatus: + return self.notifier.unregister_notification_listener(topic, listener) + + def publish(self, topic: UUri, payload: UPayload) -> UStatus: + return self.publisher.publish(topic, payload) + + def register_request_handler(self, method: UUri, handler): + return self.rpcServer.register_request_handler(method, handler) + + def unregister_request_handler(self, method: UUri, handler): + return self.rpcServer.unregister_request_handler(method, handler) + + async def invoke_method( + self, method_uri: UUri, request_payload: UPayload, options: Optional[CallOptions] = None + ) -> UPayload: + return await self.rpcClient.invoke_method(method_uri, request_payload, options) diff --git a/uprotocol/communication/upayload.py b/uprotocol/communication/upayload.py new file mode 100644 index 0000000..fef5d15 --- /dev/null +++ b/uprotocol/communication/upayload.py @@ -0,0 +1,99 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from dataclasses import dataclass, field +from typing import Optional, Type + +import google.protobuf.any_pb2 as any_pb2 +import google.protobuf.message as message + +from uprotocol.v1.uattributes_pb2 import ( + UPayloadFormat, +) + + +@dataclass(frozen=True) +class UPayload: + data: bytes = field(default_factory=bytes) + format: UPayloadFormat = UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED + + # Define EMPTY as a class-level constant + EMPTY: Optional['UPayload'] = None + + @staticmethod + def is_empty(payload: Optional['UPayload']) -> bool: + return payload is None or (payload.data == b'' and payload.format == UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED) + + @staticmethod + def pack_to_any(message: message.Message) -> 'UPayload': + if message is None: + return UPayload.EMPTY + any_message = any_pb2.Any() + any_message.Pack(message) + serialized_data = any_message.SerializeToString() + return UPayload(data=serialized_data, format=UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY) + + @staticmethod + def pack(message: message.Message) -> 'UPayload': + if message is None: + return UPayload.EMPTY + return UPayload(message.SerializeToString(), UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF) + + @staticmethod + def pack_from_data_and_format(data: bytes, format: UPayloadFormat) -> 'UPayload': + return UPayload(data, format) + + @staticmethod + def unpack(payload: Optional['UPayload'], clazz: Type[message.Message]) -> Optional[message.Message]: + if payload is None: + return None + return UPayload.unpack_data_format(payload.data, payload.format, clazz) + + @staticmethod + def unpack_data_format( + data: bytes, format: UPayloadFormat, clazz: Type[message.Message] + ) -> Optional[message.Message]: + format = format if format is not None else UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED + if data is None or len(data) == 0: + return None + try: + if format == UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY: + message = clazz() + any_message = any_pb2.Any() + any_message.ParseFromString(data) + any_message.Unpack(message) + return message + elif format == UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF: + message = clazz() + message.ParseFromString(data) + return message + else: + return None + except Exception: + return None + + +# Initialize EMPTY outside the class definition +UPayload.EMPTY = UPayload(data=bytes(), format=UPayloadFormat.UPAYLOAD_FORMAT_UNSPECIFIED) + +# Example usage: +if __name__ == "__main__": + from google.protobuf.wrappers_pb2 import Int32Value # Import Int32Value from Google protobuf wrappers + + # Create an instance of Int32Value + int_value = Int32Value(value=42) + + packed_int = UPayload.pack(int_value) + unpacked_int = UPayload.unpack(packed_int, Int32Value) + print("Unpacked Int32Value:", unpacked_int) diff --git a/uprotocol/communication/ustatuserror.py b/uprotocol/communication/ustatuserror.py new file mode 100644 index 0000000..4f7042b --- /dev/null +++ b/uprotocol/communication/ustatuserror.py @@ -0,0 +1,44 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +from typing import Optional + +from uprotocol.v1.ucode_pb2 import UCode +from uprotocol.v1.ustatus_pb2 import UStatus + + +class UStatusError(Exception): + def __init__(self, status: UStatus, cause: Optional[Exception] = None): + message = "" + if status is not None: + message = status.message + super().__init__(message, cause) + self.status = status if status is not None else UStatus(code=UCode.UNKNOWN) + self.cause = cause + + @classmethod + def from_code_message(cls, code: UCode, message: str, cause: Optional[Exception] = None): + return cls(UStatus(code=code, message=message), cause) + + def get_status(self) -> UStatus: + return self.status + + def get_code(self) -> UCode: + return self.status.code + + def get_message(self) -> str: + return self.status.message + + def get_cause(self) -> Exception: + return self.cause diff --git a/uprotocol/transport/builder/umessagebuilder.py b/uprotocol/transport/builder/umessagebuilder.py index be0e167..56e2d49 100644 --- a/uprotocol/transport/builder/umessagebuilder.py +++ b/uprotocol/transport/builder/umessagebuilder.py @@ -12,14 +12,11 @@ SPDX-License-Identifier: Apache-2.0 """ -from google.protobuf.any_pb2 import Any -from google.protobuf.message import Message - +from uprotocol.communication.upayload import UPayload from uprotocol.uuid.factory.uuidfactory import Factories from uprotocol.v1.uattributes_pb2 import ( UAttributes, UMessageType, - UPayloadFormat, UPriority, ) from uprotocol.v1.ucode_pb2 import UCode @@ -154,6 +151,7 @@ def __init__(self, source: UUri, id_val: UUID, type_val: UMessageType): @param source Source address of the message. @param id_val Unique identifier for the message. @param type_val Message type such as Publish a state change, + RPC request or RPC response. """ self.source = source @@ -242,7 +240,7 @@ def with_sink(self, sink: UUri): self.sink = sink return self - def _build_static(self): + def build(self): """Construct the UMessage from the builder. @return Returns a constructed @@ -276,29 +274,11 @@ def _build_static(self): message_builder.payload = self.payload return message_builder - def build(self, arg1=None, arg2=None): - if arg1 is None and arg2 is None: - return self._build_static() - elif isinstance(arg1, Any) and arg2 is None: - if arg1 is None: - raise ValueError("Any cannot be null.") - self.format = UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY - self.payload = arg1.SerializeToString() - return self._build_static() - elif isinstance(arg1, Message) and arg2 is None: - if arg1 is None: - raise ValueError("Protobuf Message cannot be null.") - self.format = UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF - self.payload = arg1.SerializeToString() - return self._build_static() - elif isinstance(arg2, bytes): - if arg1 is None: - raise ValueError("Format cannot be null.") - if arg2 is None: - raise ValueError("Payload cannot be null.") - self.format = arg1 - self.payload = arg2 - return self._build_static() + def build_from_upayload(self, payload: UPayload): + if payload is not None: + self.payload = payload.data + self.format = payload.format + return self.build() def _calculate_priority(self): if self.type in [ diff --git a/uprotocol/transport/utransport.py b/uprotocol/transport/utransport.py index 3587004..8526d1e 100644 --- a/uprotocol/transport/utransport.py +++ b/uprotocol/transport/utransport.py @@ -13,7 +13,6 @@ """ from abc import ABC, abstractmethod -from typing import Optional from uprotocol.transport.ulistener import UListener from uprotocol.v1.umessage_pb2 import UMessage @@ -29,6 +28,7 @@ class UTransport(ABC): """ TRANSPORT_NULL_ERROR = "Transport cannot be null" + TRANSPORT_NOT_INSTANCE_ERROR = "Transport must be an instance of UTransport" @abstractmethod def send(self, message: UMessage) -> UStatus: @@ -39,7 +39,7 @@ def send(self, message: UMessage) -> UStatus: pass @abstractmethod - def register_listener(self, source_filter: UUri, sink_filter: Optional[UUri], listener: UListener) -> UStatus: + def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: """Register UListener for UUri source and sink filters to be called when a message is received. @@ -48,6 +48,7 @@ def register_listener(self, source_filter: UUri, sink_filter: Optional[UUri], li @param sink_filter The UAttributes sink address pattern that the message to receive needs to match or None to match messages that do not contain any sink address. @param listener The UListener that will execute when the message is + received on the given UUri. @return Returns UStatus with UCode.OK if the listener is registered correctly, otherwise it returns with the appropriate failure. @@ -55,7 +56,7 @@ def register_listener(self, source_filter: UUri, sink_filter: Optional[UUri], li pass @abstractmethod - def unregister_listener(self, source_filter: UUri, sink_filter: Optional[UUri], listener: UListener) -> UStatus: + def unregister_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus: """Unregister UListener for UUri source and sink filters. Messages arriving at this topic will no longer be processed by this listener. diff --git a/uprotocol/transport/validator/uattributesvalidator.py b/uprotocol/transport/validator/uattributesvalidator.py index c80b14f..c59e855 100644 --- a/uprotocol/transport/validator/uattributesvalidator.py +++ b/uprotocol/transport/validator/uattributesvalidator.py @@ -233,6 +233,7 @@ def validate_sink(self, attributes_value: UAttributes) -> ValidationResult: Validate the sink UriPart for Publish events. Publish should not have a sink. @param attributes_value UAttributes object containing the sink to validate. + @return Returns a ValidationResult that is success or failed with a failure message. """ return ( @@ -301,6 +302,7 @@ def validate_priority(self, attributes_value: UAttributes) -> ValidationResult: Validate the priority value to ensure it is one of the known CS values @param attributes_value Attributes object containing the Priority to validate. + @return Returns a {@link ValidationResult} that is success or failed with a failure message. """ return ( @@ -372,6 +374,7 @@ def validate_priority(self, attributes_value: UAttributes) -> ValidationResult: Validate the priority value to ensure it is one of the known CS values @param attributes_value Attributes object containing the Priority to validate. + @return Returns a ValidationResult that is success or failed with a failure message. """ diff --git a/uprotocol/uri/factory/uri_factory.py b/uprotocol/uri/factory/uri_factory.py index ed8f6c6..ab3e8ad 100644 --- a/uprotocol/uri/factory/uri_factory.py +++ b/uprotocol/uri/factory/uri_factory.py @@ -26,6 +26,13 @@ class UriFactory: URI Factory that builds URIs from protos """ + ANY = UUri( + authority_name="*", + ue_id=0xFFFF, + ue_version_major=0xFF, + resource_id=0xFFFF, + ) + @staticmethod def from_proto( service_descriptor: Optional[ServiceDescriptor], resource_id: int, authority_name: Optional[str] @@ -36,6 +43,7 @@ def from_proto( @param resource_id The resource id. @param authority_name The authority name. @return Returns a URI for a protobuf generated code + Service Descriptor. """ if service_descriptor is None: @@ -53,20 +61,8 @@ def from_proto( uuri.resource_id = resource_id if id_val is not None: uuri.ue_id = id_val + if authority_name is not None: uuri.authority_name = authority_name return uuri - - @staticmethod - def any_func() -> UUri: - """ - Returns a URI with all fields set to 0. - @return Returns a URI with all fields set to 0. - """ - return UUri( - authority_name="*", - ue_id=0xFFFF, - ue_version_major=0xFF, - resource_id=0xFFFF, - ) diff --git a/uprotocol/uri/serializer/uriserializer.py b/uprotocol/uri/serializer/uriserializer.py index 09e47b9..d0d749c 100644 --- a/uprotocol/uri/serializer/uriserializer.py +++ b/uprotocol/uri/serializer/uriserializer.py @@ -92,6 +92,7 @@ def _build_uri(is_local, uri_parts, number_of_parts_in_uri): if uri_parts[2].strip() == "": return UUri() auth_name, ue_id, ue_version, ur_id = UriSerializer._build_remote_uri(uri_parts, number_of_parts_in_uri) + return UUri( authority_name=auth_name, ue_id=ue_id, @@ -119,6 +120,7 @@ def deserialize(uri: Optional[str]) -> UUri: try: new_uri = UriSerializer._build_uri(is_local, uri_parts, number_of_parts_in_uri) + except ValueError: return UUri() diff --git a/uprotocol/uuid/factory/uuidfactory.py b/uprotocol/uuid/factory/uuidfactory.py index bac99bf..215d26f 100644 --- a/uprotocol/uuid/factory/uuidfactory.py +++ b/uprotocol/uuid/factory/uuidfactory.py @@ -13,7 +13,7 @@ """ import random -from datetime import datetime +from datetime import datetime, timezone from uprotocol.uuid.factory import uuid6 from uprotocol.uuid.factory.uuidutils import UUIDUtils @@ -37,24 +37,23 @@ def _create(self, instant) -> UUID: return UUID(msb=msb, lsb=lsb) -class UUIDv8Factory(UUIDFactory): - MAX_COUNT = 0xFFF - _lsb = (random.getrandbits(63) & 0x3FFFFFFFFFFFFFFF) | 0x8000000000000000 - UUIDV8_VERSION = 8 - _msb = UUIDV8_VERSION << 12 - +class UUIDv7Factory(UUIDFactory): def _create(self, instant) -> UUID: - time = int(instant.timestamp() * 1000) if instant else int(datetime.now().timestamp() * 1000) + if instant is None: + instant = datetime.now(timezone.utc) + time = int(instant.timestamp() * 1000) # milliseconds since epoch - if time == (self._msb >> 16): - if (self._msb & 0xFFF) < self.MAX_COUNT: - self._msb += 1 - else: - self._msb = (time << 16) | (8 << 12) + rand_a = random.getrandbits(12) # 12 bits for random part + rand_b = random.getrandbits(62) # 62 bits for random part - return UUID(msb=self._msb, lsb=self._lsb) + # Construct the MSB (most significant bits) + msb = (time << 16) | (7 << 12) | rand_a # version 7 in the 12th bit + + # Construct the LSB (least significant bits) + lsb = rand_b | (1 << 63) # set the variant to '1' + return UUID(msb=msb, lsb=lsb) class Factories: UUIDV6 = UUIDv6Factory() - UPROTOCOL = UUIDv8Factory() + UPROTOCOL = UUIDv7Factory() diff --git a/uprotocol/uuid/factory/uuidutils.py b/uprotocol/uuid/factory/uuidutils.py index 729282e..bc59788 100644 --- a/uprotocol/uuid/factory/uuidutils.py +++ b/uprotocol/uuid/factory/uuidutils.py @@ -29,7 +29,7 @@ class Version(Enum): VERSION_UNKNOWN = 0 VERSION_RANDOM_BASED = 4 VERSION_TIME_ORDERED = 6 - VERSION_UPROTOCOL = 8 + VERSION_UPROTOCOL = 7 @staticmethod def get_version(value: int): @@ -80,7 +80,7 @@ def get_variant(uuid_obj: UUID) -> Optional[str]: @staticmethod def is_uprotocol(uuid_obj: UUID) -> bool: """ - Verify if version is a formal UUIDv8 uProtocol ID.

+ Verify if version is a formal UUIDv7 uProtocol ID.

@param uuid_obj:UUID object @return:true if is a uProtocol UUID or false if uuid passed is null or the UUID is not uProtocol format. @@ -109,9 +109,9 @@ def is_uuidv6(uuid_obj: UUID) -> bool: @staticmethod def is_uuid(uuid_obj: UUID) -> bool: """ - Verify uuid is either v6 or v8

+ Verify uuid is either v6 or v7

@param uuid_obj: UUID object - @return:true if is UUID version 6 or 8 + @return:true if is UUID version 6 or 7 """ return UUIDUtils.is_uprotocol(uuid_obj) or UUIDUtils.is_uuidv6(uuid_obj) if uuid_obj is not None else False diff --git a/uprotocol/uuid/validator/uuidvalidator.py b/uprotocol/uuid/validator/uuidvalidator.py index c92ef2a..2dad8c0 100644 --- a/uprotocol/uuid/validator/uuidvalidator.py +++ b/uprotocol/uuid/validator/uuidvalidator.py @@ -95,13 +95,13 @@ def validate_variant(self, uuid: UUID) -> ValidationResult: ) -class UUIDv8Validator(UuidValidator): +class UUIDv7Validator(UuidValidator): def validate_version(self, uuid: UUID) -> ValidationResult: version = UUIDUtils.get_version(uuid) return ( ValidationResult.success() if version and version == Version.VERSION_UPROTOCOL - else (ValidationResult.failure("Invalid UUIDv8 Version")) + else (ValidationResult.failure("Invalid UUIDv7 Version")) ) def validate_variant(self, uuid: UUID) -> ValidationResult: @@ -111,7 +111,7 @@ def validate_variant(self, uuid: UUID) -> ValidationResult: class Validators(Enum): UNKNOWN = InvalidValidator() # Use a default validator instance UUIDV6 = UUIDv6Validator() # Use a default validator instance - UPROTOCOL = UUIDv8Validator() # Use a default validator instance + UPROTOCOL = UUIDv7Validator() # Use a default validator instance def validator(self): return self.value