Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ exclude = [
"dist/",
".venv/",
"__pycache__/",
"uprotocol/cloudevent/cloudevents_pb2.py",
"uprotocol/core/*",
"uprotocol/v1/*",
"uprotocol/uoptions_pb2.py"
Expand Down
65 changes: 29 additions & 36 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

This library implements the https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/languages.adoc[uProtocol Language Specific Library Requirements] for Python defined in https://github.com/eclipse-uprotocol/uprotocol-spec/tree/main[uProtocol Specifications]. The library is organized into packages that are described in <<sdk-packages>> below. Each package contains a README.adoc file that describes the purpose of the package and how to use it.

The module contains the factory methods, serializers, and validators for all data types defined in the specifications, and any data models that either haven't or couldn't be defined in uprotocol-core-api yet (ex. UPayload) This library fits into the big picture of the uProtocol SDK as seen in <<uprotocol-sdk>> below.
The module contains the factory methods, serializers, and validators for all data types defined in the specifications, and any data models that either haven't or couldn't be defined in uprotocol-core-api yet (ex. UPayload) This library fits into the big picture of the uProtocol SDK as seen in the diagram below.

.uProtocol SDK
image:https://raw.githubusercontent.com/eclipse-uprotocol/uprotocol-spec/main/uprotocol_sdk.drawio.svg[#uprotocol-sdk,width=100%,align="center"]
image:https://raw.githubusercontent.com/eclipse-uprotocol/up-spec/main/up_libraries.drawio.svg[#uprotocol-sdk,width=100%,align="center"]


== Getting Started
Expand All @@ -21,7 +21,9 @@ Before proceeding with the setup of this project, ensure that the following prer
----
mvn -version
----
If Maven is properly installed, you should see information about the Maven version and configuration.
If Maven is properly installed, you should see information about the Maven version and configuration. +

NOTE: Ensure you are using Java 17 with your Maven installation before continuing with the next steps. Other versions of Java may not be supported.

=== Importing the Library

Expand All @@ -31,7 +33,7 @@ To set up SDK, follow the steps below:
+
[source]
----
git clone https://github.com/eclipse-uprotocol/uprotocol-python.git
git clone https://github.com/eclipse-uprotocol/up-python.git
----

. Execute the `pull_and_compile_protos.py` script using the following commands:
Expand All @@ -44,7 +46,7 @@ python pull_and_compile_protos.py
This script automates the following tasks:

1. **Cloning and Compilation of Protos:**
Clones the `up-core-api` protos from the specified repository URL, compiles them, and generates Python protofiles in the protos folder.
Clones the `up-core-api` protos from the `up-spec` repository, compiles them, and generates Python protofiles in the protos folder.

. Install up-python
+
Expand All @@ -53,50 +55,30 @@ This script automates the following tasks:
python -m pip install ../
----

*This will install the up-python, making its classes and modules available for import in your python code.*
*This will install up-python, making its classes and modules available for import in your python code.*

=== Using the Library

The Library is broken up into different packages that are described in <<sdk-packages>> below. Each package contains a README.adoc file that describes the purpose of the package and how to use it. Packages are organized into the following directories:

.Package Folders
[#pkg-folders,width=100%,cols="20%,80%",options="header"]
|===

| Folder | Purpose

| `*builder*` or `*factory*`
| Contains factory methods for creating uProtocol data types

| `*serializer*`
| Contains serializers to convert the objects into byte or string form representation of said object

| `*validator*`
| Contains validators to validate the data types and report errors if the objects are missing or incorrect

|===


.SDK Packages
[#sdk-packages,width=100%,cols="20%,80%",options="header"]
|===

| Package | Purpose

| link:uprotocol/uri/README.adoc[`*uuri*`]
| Uniform Resource Identifier (RFC3986), how uProtocol addresses things (devices, software, methods, topics, etc...) on the network
| link:uprotocol/uri/README.adoc[`*uri*`]
| Uniform Resource Identifier (RFC3986), how uProtocol addresses things (devices, software, methods, topics, etc...) on the network.

| link:uprotocol/uuid/README.adoc[`*uuid*`]
| Identifier used to uniquely identify (and timestamp) messages that are sent
| Identifier used to uniquely identify (and timestamp) messages that are sent.

| link:uprotocol/rpc/README.adoc[`*rpc*`]
| Interface to build client and service stubs for uServices. This interface is then implemented by https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/up-l2/rpcclient.adoc[RpcClient interface]
| link:uprotocol/communication/README.adoc[`*communication*`]
| Interface to build entities that use UTransport APIs to communicate with other entities. This is described in further detail on the up-spec page about https://github.com/eclipse-uprotocol/up-spec/tree/main/up-l2[L2 APIs].

| link:uprotocol/transport/README.adoc[`*utransport*`]
| Interface and data model declaration used for bidirectional point-2-point communication between uEs. This interface is then implemented by https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/ulink.adoc[ulink] libraries for a given underlining transport (ex. Binder, MQTT, Zenoh, SOME/IP, DDS, HTTP, etc…​)

| link:uprotocol/cloudevent/README.adoc[`*cloudevent*`]
| Common way to represent uProtocol messages using CloudEvent data model used by some transports (ex. MQTT, HTTP, etc…​)
| link:uprotocol/transport/README.adoc[`*transport*`]
| Interface and data model declaration used for bidirectional point-2-point communication between uEs. This interface is then implemented by client libraries (described https://github.com/eclipse-uprotocol/up-spec/blob/main/up-l1/README.adoc[here]) for a given underlying transport (ex. Binder, MQTT, Zenoh, SOME/IP, DDS, HTTP, etc…​)

|===

Expand All @@ -110,7 +92,18 @@ Clean up by running the command:

=== Running the Tests

Requires coverage to be installed first, that can be done by running `pip install coverage`
- Execute below command from up-python directory

[source]
----
python -m coverage run --source uprotocol/ -m pytest
----
=== Generate coverage report

[source]
----
python -m coverage report
python -m coverage html
----
This generates the HTML report to htmlcov\index.html

then you run:
`python -m coverage run --source tests/ -m unittest discover`
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@ cloudevents = "*"
gitpython = ">=3.1.41"
googleapis-common-protos = ">=1.56.4"
protobuf = "4.24.2"

[tool.poetry.dev-dependencies]
pytest = ">=6.2.5"
pytest-asyncio = ">=0.15.1"
coverage = ">=6.5.0"
pytest-timeout = ">=1.4.2"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
138 changes: 51 additions & 87 deletions tests/test_communication/mock_utransport.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@
SPDX-License-Identifier: Apache-2.0
"""

import asyncio
import threading
from abc import ABC
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List
from typing import List

from uprotocol.communication.upayload import UPayload
from uprotocol.core.usubscription.v3.usubscription_pb2 import (
SubscriptionRequest,
SubscriptionResponse,
SubscriptionStatus,
UnsubscribeResponse,
)
from uprotocol.transport.builder.umessagebuilder import UMessageBuilder
from uprotocol.transport.ulistener import UListener
from uprotocol.transport.utransport import UTransport
from uprotocol.transport.validator.uattributesvalidator import UAttributesValidator
from uprotocol.uri.factory.uri_factory import UriFactory
from uprotocol.uri.serializer.uriserializer import UriSerializer
from uprotocol.uri.validator.urivalidator import UriValidator
from uprotocol.v1.uattributes_pb2 import (
UMessageType,
)
Expand All @@ -36,108 +38,70 @@
from uprotocol.validation.validationresult import ValidationResult


# ToDo Change the implementation of transport APIs to use the URI match pattern and save listeners
# against the source and sink filter tuple.
class MockUTransport(UTransport):
def get_source(self) -> UUri:
return self.source

def __init__(self, source=None):
super().__init__()
self.source = source if source else UUri(authority_name="Neelam", ue_id=4, ue_version_major=1)
self.listeners: Dict[str, List[UListener]] = {}
self.listeners: List[UListener] = []
self.lock = threading.Lock()

def build_response(self, request: UMessage):
payload = UPayload.pack_from_data_and_format(request.payload, request.attributes.payload_format)

return UMessageBuilder.response_for_request(request.attributes).build_from_upayload(payload)

def close(self):
self.listeners.clear()

async def register_listener(self, source_filter: UUri, listener: UListener, sink_filter: UUri = None) -> UStatus:
with self.lock:
if sink_filter is not None: # method uri
topic = UriSerializer().serialize(sink_filter)
else:
topic = UriSerializer().serialize(source_filter)

if topic not in self.listeners:
self.listeners[topic] = []
self.listeners[topic].append(listener)
return UStatus(code=UCode.OK)

async def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus:
with self.lock:
if sink is not None: # method uri
topic = UriSerializer().serialize(sink)
else:
topic = UriSerializer().serialize(source)

if topic in self.listeners and listener in self.listeners[topic]:
self.listeners[topic].remove(listener)
if not self.listeners[topic]: # If the list is empty, remove the key
del self.listeners[topic]
code = UCode.OK
self.executor = ThreadPoolExecutor()

def build_response(self, request: UMessage) -> UMessage:
if request.attributes.sink.ue_id == 0:
if request.attributes.sink.resource_id == 1:
try:
subscription_request = SubscriptionRequest.parse(request.payload)
sub_response = SubscriptionResponse(
topic=subscription_request.topic,
status=SubscriptionStatus(state=SubscriptionStatus.State.SUBSCRIBED),
)
return UMessageBuilder.response_for_request(request.attributes).build_from_upayload(
UPayload.pack(sub_response)
)
except Exception:
return UMessageBuilder.response_for_request(request.attributes).build_from_upayload(
UPayload.pack(UnsubscribeResponse())
)
else:
code = UCode.INVALID_ARGUMENT
result = UStatus(code=code)
return result
return UMessageBuilder.response_for_request(request.attributes).build_from_upayload(
UPayload.pack(UnsubscribeResponse())
)
return UMessageBuilder.response_for_request(request.attributes).build_from_upayload(
UPayload.pack_from_data_and_format(request.payload, request.attributes.payload_format)
)

async def send(self, message: UMessage) -> UStatus:
validator = UAttributesValidator.get_validator(message.attributes)
with self.lock:
if message is None or validator.validate(message.attributes) != ValidationResult.success():
return UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid message attributes")

# Use a ThreadPoolExecutor with max_workers=1
executor = ThreadPoolExecutor(max_workers=1)
if message is None or validator.validate(message.attributes) != ValidationResult.success():
return UStatus(code=UCode.INVALID_ARGUMENT, message="Invalid message attributes")

try:
# Submit _notify_listeners to the executor
future = executor.submit(self._notify_listeners, message)
if message.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST:
response = self.build_response(message)
self._notify_listeners(response)

# Await completion of the Future
await asyncio.wrap_future(future)
return UStatus(code=UCode.OK)

finally:
# Clean up the executor
executor.shutdown()
def _notify_listeners(self, response: UMessage):
for listener in self.listeners:
listener.on_receive(response)

async def register_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus:
self.listeners.append(listener)
return UStatus(code=UCode.OK)

def _notify_listeners(self, umsg):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_PUBLISH:
for key, listeners in self.listeners.items():
uri = UriSerializer().deserialize(key)
if not (UriValidator.is_rpc_method(uri) or UriValidator.is_rpc_response(uri)):
for listener in listeners:
loop.call_soon_threadsafe(listener.on_receive, umsg)
async def unregister_listener(self, source: UUri, listener: UListener, sink: UUri = None) -> UStatus:
if listener in self.listeners:
self.listeners.remove(listener)
return UStatus(code=UCode.OK)
return UStatus(code=UCode.NOT_FOUND)

else:
if umsg.attributes.type == UMessageType.UMESSAGE_TYPE_REQUEST:
serialized_uri = UriSerializer().serialize(umsg.attributes.sink)
if serialized_uri not in self.listeners:
# no listener registered for method uri, send dummy response.
# This case will only come for request type
# as for response type, there will always be response handler as it is in up client
serialized_uri = UriSerializer().serialize(UriFactory.ANY)
umsg = self.build_response(umsg)
else:
# this is for response type,handle response
serialized_uri = UriSerializer().serialize(UriFactory.ANY)

if serialized_uri in self.listeners:
for listener in self.listeners[serialized_uri]:
loop.call_soon_threadsafe(listener.on_receive, umsg)
break # as there will be only one listener for method uri
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
def close(self):
self.listeners.clear()
self.executor.shutdown()


class TimeoutUTransport(MockUTransport, ABC):
Expand Down
11 changes: 11 additions & 0 deletions tests/test_communication/test_inmemoryrpcclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from uprotocol.communication.inmemoryrpcclient import InMemoryRpcClient
from uprotocol.communication.upayload import UPayload
from uprotocol.communication.ustatuserror import UStatusError
from uprotocol.transport.utransport import UTransport
from uprotocol.v1.uattributes_pb2 import UPriority
from uprotocol.v1.ucode_pb2 import UCode
from uprotocol.v1.uri_pb2 import UUri
Expand All @@ -30,6 +31,16 @@ class TestInMemoryRpcClient(unittest.IsolatedAsyncioTestCase):
def create_method_uri():
return UUri(authority_name="neelam", ue_id=10, ue_version_major=1, resource_id=3)

def test_constructor_transport_none(self):
with self.assertRaises(ValueError) as context:
InMemoryRpcClient(None)
self.assertEqual(str(context.exception), UTransport.TRANSPORT_NULL_ERROR)

def test_constructor_transport_not_instance(self):
with self.assertRaises(ValueError) as context:
InMemoryRpcClient("Invalid Transport")
self.assertEqual(str(context.exception), UTransport.TRANSPORT_NOT_INSTANCE_ERROR)

async def test_invoke_method_with_payload(self):
payload = UPayload.pack_to_any(UUri())
rpc_client = InMemoryRpcClient(MockUTransport())
Expand Down
Loading