From 505e708cf7790ae9953eba28da170d367f2e5360 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Thu, 14 Dec 2023 14:42:41 -0500 Subject: [PATCH 1/3] =?UTF-8?q?Add=20methods=20for=20CloudEvent=20?= =?UTF-8?q?=E2=86=94=20UMessage=20conversion?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- uprotocol/cloudevent/factory/ucloudevent.py | 131 +++++++++++++++++++- uprotocol/proto/umessage_pb2.py | 29 +++++ 2 files changed, 156 insertions(+), 4 deletions(-) create mode 100644 uprotocol/proto/umessage_pb2.py diff --git a/uprotocol/cloudevent/factory/ucloudevent.py b/uprotocol/cloudevent/factory/ucloudevent.py index c85ff02..29fb840 100644 --- a/uprotocol/cloudevent/factory/ucloudevent.py +++ b/uprotocol/cloudevent/factory/ucloudevent.py @@ -30,10 +30,15 @@ from cloudevents.http import CloudEvent from google.protobuf import any_pb2 from google.protobuf.message import DecodeError + +from uprotocol.cloudevent.factory.cloudeventfactory import CloudEventFactory from uprotocol.proto.ustatus_pb2 import UCode -from uprotocol.proto.uattributes_pb2 import UMessageType +from uprotocol.proto.uattributes_pb2 import UMessageType, UPriority, UAttributes +from uprotocol.uri.serializer.longuriserializer import LongUriSerializer from uprotocol.uuid.factory.uuidutils import UUIDUtils from uprotocol.uuid.serializer.longuuidserializer import LongUuidSerializer +from uprotocol.proto.upayload_pb2 import UPayloadFormat, UPayload +from uprotocol.proto.umessage_pb2 import UMessage class UCloudEvent: @@ -295,7 +300,7 @@ def unpack(ce: CloudEvent, clazz): @return: Returns a {@link Message} payload of the class type that is provided. """ try: - any_obj=UCloudEvent.get_payload(ce) + any_obj = UCloudEvent.get_payload(ce) value = clazz() value.ParseFromString(any_obj.value) return value @@ -349,9 +354,127 @@ def extract_integer_value_from_attributes(attr_name, ce: CloudEvent) -> int: @staticmethod def get_event_type(type): return {UMessageType.UMESSAGE_TYPE_PUBLISH: "pub.v1", UMessageType.UMESSAGE_TYPE_REQUEST: "req.v1", - UMessageType.UMESSAGE_TYPE_RESPONSE: "res.v1"}.get(type, "") + UMessageType.UMESSAGE_TYPE_RESPONSE: "res.v1"}.get(type, "") @staticmethod def get_message_type(ce_type): return {"pub.v1": UMessageType.UMESSAGE_TYPE_PUBLISH, "req.v1": UMessageType.UMESSAGE_TYPE_REQUEST, - "res.v1": UMessageType.UMESSAGE_TYPE_RESPONSE}.get(ce_type, UMessageType.UMESSAGE_TYPE_UNSPECIFIED) + "res.v1": UMessageType.UMESSAGE_TYPE_RESPONSE}.get(ce_type, UMessageType.UMESSAGE_TYPE_UNSPECIFIED) + + @staticmethod + def get_content_type_from_upayload_format(payload_format: UPayloadFormat): + """ + Retrieves the content type string based on the provided UPayloadFormat enumeration.

+ @param payload_format The UPayloadFormat enumeration representing the payload format. + @return The corresponding content type string based on the payload format. + """ + return { + UPayloadFormat.UPAYLOAD_FORMAT_JSON: "application/json", + UPayloadFormat.UPAYLOAD_FORMAT_RAW: "application/octet-stream", + UPayloadFormat.UPAYLOAD_FORMAT_TEXT: "text/plain", + UPayloadFormat.UPAYLOAD_FORMAT_SOMEIP: "application/x-someip", + UPayloadFormat.UPAYLOAD_FORMAT_SOMEIP_TLV: "application/x-someip_tlv", + }.get(payload_format, "") + + @staticmethod + def get_upayload_format_from_content_type(contenttype: str): + """ + Retrieves the payload format enumeration based on the provided content type.

+ @param contenttype The content type string representing the format of the payload. + @return The corresponding UPayloadFormat enumeration based on the content type. + """ + if contenttype is None: + return UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF + + content_type_mapping = { + "application/json": UPayloadFormat.UPAYLOAD_FORMAT_JSON, + "application/octet-stream": UPayloadFormat.UPAYLOAD_FORMAT_RAW, + "text/plain": UPayloadFormat.UPAYLOAD_FORMAT_TEXT, + "application/x-someip": UPayloadFormat.UPAYLOAD_FORMAT_SOMEIP, + "application/x-someip_tlv": UPayloadFormat.UPAYLOAD_FORMAT_SOMEIP_TLV, + } + return content_type_mapping.get(contenttype, UPayloadFormat.UPAYLOAD_FORMAT_PROTOBUF) + + @staticmethod + def fromMessage(message: UMessage) -> CloudEvent: + """ + Get the Cloudevent from the UMessage
+ Note: For now, only the value format of UPayload is supported in the SDK.If the UPayload has a reference, it + needs to be copied to CloudEvent. + @param message The UMessage protobuf containing the data + @return returns the cloud event + """ + attributes = message.attributes + data = bytearray() + json_attributes = {"id": LongUuidSerializer.instance().serialize(attributes.id), + "source": LongUriSerializer.serialize(message.source), + "type": UCloudEvent.get_content_type_from_upayload_format(attributes.type)} + contenttype = UCloudEvent.get_content_type_from_upayload_format(message.payload.format) + if contenttype: + json_attributes['datacontenttype'] = CloudEventFactory.PROTOBUF_CONTENT_TYPE + + # IMPORTANT: Currently, ONLY the VALUE format is supported in the SDK! + if message.payload.HasField('value'): + data = message.payload.value.SerializeToString() + if attributes.HasField('ttl'): + json_attributes['ttl'] = attributes.ttl + if attributes.HasField('priority'): + json_attributes['priority'] = UPriority.Name(attributes.priority) + if attributes.HasField('token'): + json_attributes['token'] = attributes.token + if attributes.HasField('sink'): + json_attributes['sink'] = attributes.sink + if attributes.HasField('commstatus'): + json_attributes['commstatus'] = attributes.commstatus + if attributes.HasField('reqid'): + json_attributes['reqid'] = attributes.reqid + if attributes.HasField('plevel'): + json_attributes['plevel'] = attributes.permission_level + + cloud_event = CloudEvent(json_attributes, data) + return cloud_event + + @staticmethod + def toMessage(event: CloudEvent) -> UMessage: + """ + + Get the UMessage from the cloud event + @param event The CloudEvent containing the data. + @return returns the UMessage + """ + if event is None: + raise ValueError("Cloud Event can't be None") + source = LongUriSerializer().deserialize(UCloudEvent.get_source(event)) + payload = UPayload(format=UCloudEvent.get_upayload_format_from_content_type(UCloudEvent.get_data_content_type(event)), + value=UCloudEvent.get_payload(event).SerializeToString()) + attributes= UAttributes(id=LongUuidSerializer.instance().serialize(UCloudEvent.get_id(event)), type=UCloudEvent.get_message_type(UCloudEvent.get_type(event))) + if UCloudEvent.has_communication_status_problem(event): + attributes.commstatus = UCloudEvent.get_communication_status(event) + priority= UCloudEvent.get_priority(event) + if priority is not None: + attributes.priority=priority + + sink=UCloudEvent.get_sink(event) + if sink is not None: + attributes.sink.CopyFrom(LongUriSerializer().deserialize(sink)) + + reqid = UCloudEvent.get_request_id(event) + if reqid is not None: + attributes.reqid = reqid + + ttl = UCloudEvent.get_ttl(event) + if ttl is not None: + attributes.ttl = ttl + + token = UCloudEvent.get_token(event) + if token is not None: + attributes.token = token + + plevel = UCloudEvent.extract_integer_value_from_attributes("plevel", event) + if plevel is not None: + attributes.permission_level = plevel + + return UMessage(attributes=attributes,payload=payload, source=source) + + + diff --git a/uprotocol/proto/umessage_pb2.py b/uprotocol/proto/umessage_pb2.py new file mode 100644 index 0000000..7d85d97 --- /dev/null +++ b/uprotocol/proto/umessage_pb2.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: umessage.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +import uattributes_pb2 as uattributes__pb2 +import upayload_pb2 as upayload__pb2 +import uri_pb2 as uri__pb2 + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0eumessage.proto\x12\x0cuprotocol.v1\x1a\x11uattributes.proto\x1a\x0eupayload.proto\x1a\turi.proto\"\x86\x01\n\x08UMessage\x12\"\n\x06source\x18\x01 \x01(\x0b\x32\x12.uprotocol.v1.UUri\x12-\n\nattributes\x18\x02 \x01(\x0b\x32\x19.uprotocol.v1.UAttributes\x12\'\n\x07payload\x18\x03 \x01(\x0b\x32\x16.uprotocol.v1.UPayloadB+\n\x18org.eclipse.uprotocol.v1B\rUMessageProtoP\x01\x62\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'umessage_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + _globals['DESCRIPTOR']._options = None + _globals['DESCRIPTOR']._serialized_options = b'\n\030org.eclipse.uprotocol.v1B\rUMessageProtoP\001' + _globals['_UMESSAGE']._serialized_start=79 + _globals['_UMESSAGE']._serialized_end=213 +# @@protoc_insertion_point(module_scope) From 7df81254d5d40f3a68187b6dbfa055408e7558b4 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Fri, 15 Dec 2023 14:57:41 -0500 Subject: [PATCH 2/3] Add few test cases for ucloudevent --- .../test_datamodel/test_ucloudevent.py | 265 ++++++++++++++++++ uprotocol/cloudevent/factory/ucloudevent.py | 54 ++-- uprotocol/proto/umessage_pb2.py | 6 +- 3 files changed, 296 insertions(+), 29 deletions(-) create mode 100644 tests/test_cloudevent/test_datamodel/test_ucloudevent.py diff --git a/tests/test_cloudevent/test_datamodel/test_ucloudevent.py b/tests/test_cloudevent/test_datamodel/test_ucloudevent.py new file mode 100644 index 0000000..d5a4821 --- /dev/null +++ b/tests/test_cloudevent/test_datamodel/test_ucloudevent.py @@ -0,0 +1,265 @@ +import time +import unittest + +from google.protobuf import any_pb2 + +from uprotocol.cloudevent.datamodel.ucloudeventattributes import UCloudEventAttributesBuilder +from uprotocol.cloudevent.factory.ucloudevent import UCloudEvent +# ------------------------------------------------------------------------- + +# Copyright (c) 2023 General Motors GTO LLC +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License") you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# SPDX-FileType: SOURCE +# SPDX-FileCopyrightText: 2023 General Motors GTO LLC +# SPDX-License-Identifier: Apache-2.0 + +# ------------------------------------------------------------------------- + +from uprotocol.proto.uri_pb2 import UUri, UEntity, UResource +from uprotocol.uri.serializer.longuriserializer import LongUriSerializer +from uprotocol.proto.cloudevents_pb2 import CloudEvent +from uprotocol.proto.uattributes_pb2 import UMessageType, UPriority +from uprotocol.cloudevent.factory.cloudeventfactory import CloudEventFactory +from uprotocol.proto.ustatus_pb2 import UCode +from uprotocol.uuid.factory.uuidfactory import Factories +from uprotocol.uuid.serializer.longuuidserializer import LongUuidSerializer + + +def build_uri_for_test(): + uri = UUri(entity=UEntity(name="body.access"), + resource=UResource(name="door", instance="front_left", message="Door")) + return LongUriSerializer().serialize(uri) + + +def build_proto_payload_for_test(): + ce_proto = CloudEvent(spec_version="1.0", source="//VCU.MY_CAR_VIN/body.access//door.front_left#Door", id="hello", + type="example.demo", + proto_data=any_pb2.Any()) + + any_obj = any_pb2.Any() + any_obj.Pack(ce_proto) + return any_obj + + +def build_cloud_event_for_test(): + source = build_uri_for_test() + proto_payload = build_proto_payload_for_test() + # additional attributes + u_cloud_event_attributes = UCloudEventAttributesBuilder().with_hash("somehash").with_priority( + UPriority.UPRIORITY_CS1).with_ttl(3).with_token("someOAuthToken").build() + + # build the cloud event + cloud_event = CloudEventFactory.build_base_cloud_event("testme", source, proto_payload.SerializeToString(), + proto_payload.type_url, u_cloud_event_attributes, + UCloudEvent.get_event_type( + UMessageType.UMESSAGE_TYPE_PUBLISH)) + return cloud_event + + +class TestUCloudEvent(unittest.TestCase): + DATA_CONTENT_TYPE = "application/x-protobuf" + + def test_extract_source_from_cloudevent(self): + cloud_event = build_cloud_event_for_test() + source = UCloudEvent.get_source(cloud_event) + self.assertEquals("/body.access//door.front_left#Door", source) + + def test_extract_sink_from_cloudevent_when_sink_exists(self): + sink = "//bo.cloud/petapp/1/rpc.response" + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__("sink", sink) + self.assertEquals(sink, UCloudEvent.get_sink(cloud_event)) + + def test_extract_sink_from_cloudevent_when_sink_does_not_exist(self): + cloud_event = build_cloud_event_for_test() + self.assertEquals(None, UCloudEvent.get_sink(cloud_event)) + + def test_extract_requestId_from_cloudevent_when_requestId_exists(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__("reqid", "somereqid") + self.assertEquals("somereqid", UCloudEvent.get_request_id(cloud_event)) + + def test_extract_requestId_from_cloudevent_when_requestId_does_not_exist(self): + cloud_event = build_cloud_event_for_test() + self.assertEquals(None, UCloudEvent.get_request_id(cloud_event)) + + def test_extract_requestId_from_cloudevent_when_requestId_value_is_null(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__("reqid", None) + self.assertEquals(None, UCloudEvent.get_request_id(cloud_event)) + + def test_extract_hash_from_cloudevent_when_hash_exists(self): + cloud_event = build_cloud_event_for_test() + self.assertEquals("somehash", UCloudEvent.get_hash(cloud_event)) + + def test_extract_hash_from_cloudevent_when_hash_does_not_exist(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__delitem__("hash") + self.assertEquals(None, UCloudEvent.get_hash(cloud_event)) + + def test_extract_priority_from_cloudevent_when_priority_exists(self): + cloud_event = build_cloud_event_for_test() + self.assertEquals(UPriority.Name(UPriority.UPRIORITY_CS1), UCloudEvent.get_priority(cloud_event)) + + def test_extract_priority_from_cloudevent_when_priority_does_not_exist(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__delitem__("priority") + self.assertEquals(None, UCloudEvent.get_priority(cloud_event)) + + def test_extract_ttl_from_cloudevent_when_ttl_exists(self): + cloud_event = build_cloud_event_for_test() + self.assertEquals(3, UCloudEvent.get_ttl(cloud_event)) + + def test_extract_ttl_from_cloudevent_when_ttl_does_not_exists(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__delitem__("ttl") + self.assertEquals(None, UCloudEvent.get_ttl(cloud_event)) + + def test_extract_token_from_cloudevent_when_token_exists(self): + cloud_event = build_cloud_event_for_test() + self.assertEquals("someOAuthToken", UCloudEvent.get_token(cloud_event)) + + def test_extract_token_from_cloudevent_when_token_does_not_exists(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__delitem__("token") + self.assertEquals(None, UCloudEvent.get_token(cloud_event)) + + def test_cloudevent_has_platform_error_when_platform_error_exists(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__("commstatus", UCode.ABORTED_VALUE) + self.assertEquals(10, UCloudEvent.get_communication_status(cloud_event)) + + def test_cloudevent_has_platform_error_when_platform_error_does_not_exist(self): + cloud_event = build_cloud_event_for_test() + self.assertEquals(UCode.OK, UCloudEvent.get_communication_status(cloud_event)) + + def test_extract_platform_error_from_cloudevent_when_platform_error_exists_in_wrong_format(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__("commstatus", "boom") + self.assertEquals(UCode.OK, UCloudEvent.get_communication_status(cloud_event)) + + def test_cloudevent_is_not_expired_cd_when_no_ttl_configured(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__delitem__("ttl") + self.assertFalse(UCloudEvent.is_expired_by_cloud_event_creation_date(cloud_event)) + + def test_cloudevent_is_not_expired_cd_when_ttl_is_zero(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__("ttl", 0) + self.assertFalse(UCloudEvent.is_expired_by_cloud_event_creation_date(cloud_event)) + + def test_cloudevent_is_not_expired_cd_when_ttl_is_minus_one(self): + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__("ttl", -1) + self.assertFalse(UCloudEvent.is_expired_by_cloud_event_creation_date(cloud_event)) + + def test_cloudevent_is_expired_when_ttl_1_mili(self): + uuid = Factories.UPROTOCOL.create() + str_uuid = LongUuidSerializer.instance().serialize(uuid) + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__("ttl", 1) + cloud_event.__setitem__('id', str_uuid) + time.sleep(8) + self.assertTrue(UCloudEvent.is_expired(cloud_event)) + + def test_cloudevent_is_expired_for_invalid_uuid(self): + uuid = Factories.UPROTOCOL.create() + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__("ttl", 50000) + cloud_event.__setitem__('id', "") + self.assertFalse(UCloudEvent.is_expired(cloud_event)) + + def test_cloudevent_has_a_UUIDV8_id(self): + uuid = Factories.UPROTOCOL.create() + str_uuid = LongUuidSerializer.instance().serialize(uuid) + cloud_event = build_cloud_event_for_test() + cloud_event.__setitem__('id', str_uuid) + self.assertTrue(UCloudEvent.is_cloud_event_id(cloud_event)) + + def test_to_message_with_valid_event(self): + # additional attributes + u_cloud_event_attributes = UCloudEventAttributesBuilder().with_priority( + UPriority.UPRIORITY_CS2).with_ttl(3).build() + cloud_event = CloudEventFactory.publish(build_uri_for_test(), build_proto_payload_for_test(), + u_cloud_event_attributes) + u_message = UCloudEvent.toMessage(cloud_event) + self.assertIsNotNone(u_message) + + def test_from_message_with_valid_message(self): + # additional attributes + u_cloud_event_attributes = UCloudEventAttributesBuilder().with_priority( + UPriority.UPRIORITY_CS2).with_ttl(3).build() + cloud_event = CloudEventFactory.publish(build_uri_for_test(), build_proto_payload_for_test(), + u_cloud_event_attributes) + u_message = UCloudEvent.toMessage(cloud_event) + self.assertIsNotNone(u_message) + cloud_event1 = UCloudEvent.fromMessage(u_message) + self.assertIsNotNone(cloud_event1) + self.assertEquals(cloud_event, cloud_event1) + + def test_to_from_message_from_request_cloudevent(self): + # additional attributes + u_cloud_event_attributes = UCloudEventAttributesBuilder().with_priority( + UPriority.UPRIORITY_CS2).with_ttl(3).with_token("someOAuthToken").build() + + cloud_event = CloudEventFactory.request(build_uri_for_test(), "//bo.cloud/petapp/1/rpc.response", + CloudEventFactory.generate_cloud_event_id(), + build_proto_payload_for_test(), + u_cloud_event_attributes) + result = UCloudEvent.toMessage(cloud_event) + self.assertIsNotNone(result) + self.assertEquals(UCloudEvent.get_ttl(cloud_event), result.attributes.ttl) + self.assertEquals(UCloudEvent.get_token(cloud_event), result.attributes.token) + self.assertEquals(UCloudEvent.get_sink(cloud_event), + LongUriSerializer().serialize(result.attributes.sink)) + self.assertEquals(UCloudEvent.get_payload(cloud_event).SerializeToString(), result.payload.value) + self.assertEquals(UCloudEvent.get_source(cloud_event), + LongUriSerializer().serialize(result.source)) + self.assertEquals(UCloudEvent.get_priority(cloud_event), + UPriority.Name(result.attributes.priority)) + + cloud_event1 = UCloudEvent.fromMessage(result) + self.assertEquals(cloud_event, cloud_event1) + + def test_to_from_message_from_request_cloudevent_without_attributes(self): + # additional attributes + u_cloud_event_attributes = UCloudEventAttributesBuilder().build() + + cloud_event = CloudEventFactory.request(build_uri_for_test(), "//bo.cloud/petapp/1/rpc.response", + CloudEventFactory.generate_cloud_event_id(), + build_proto_payload_for_test(), + u_cloud_event_attributes) + result = UCloudEvent.toMessage(cloud_event) + self.assertIsNotNone(result) + self.assertFalse(result.attributes.HasField('ttl')) + self.assertEquals(UCloudEvent.get_sink(cloud_event), + LongUriSerializer().serialize(result.attributes.sink)) + self.assertEquals(UCloudEvent.get_payload(cloud_event).SerializeToString(), result.payload.value) + self.assertEquals(UCloudEvent.get_source(cloud_event), LongUriSerializer().serialize(result.source)) + self.assertEquals(result.attributes.priority, 0) + + cloud_event1 = UCloudEvent.fromMessage(result) + self.assertEquals(cloud_event.get_data(), cloud_event1.get_data()) + self.assertEquals(UCloudEvent.get_source(cloud_event),UCloudEvent.get_source(cloud_event1)) + self.assertEquals(UCloudEvent.get_sink(cloud_event),UCloudEvent.get_sink(cloud_event1)) + self.assertEquals(UCloudEvent.get_specversion(cloud_event),UCloudEvent.get_specversion(cloud_event1)) + self.assertEquals(UCloudEvent.get_priority(cloud_event),UCloudEvent.get_priority(cloud_event1)) + self.assertEquals(UCloudEvent.get_id(cloud_event),UCloudEvent.get_id(cloud_event1)) + self.assertEquals(UCloudEvent.get_type(cloud_event),UCloudEvent.get_type(cloud_event1)) + self.assertEquals(UCloudEvent.get_request_id(cloud_event),UCloudEvent.get_request_id(cloud_event1)) diff --git a/uprotocol/cloudevent/factory/ucloudevent.py b/uprotocol/cloudevent/factory/ucloudevent.py index 29fb840..a6ca1ce 100644 --- a/uprotocol/cloudevent/factory/ucloudevent.py +++ b/uprotocol/cloudevent/factory/ucloudevent.py @@ -1,5 +1,5 @@ # ------------------------------------------------------------------------- - +import time # Copyright (c) 2023 General Motors GTO LLC # # Licensed to the Apache Software Foundation (ASF) under one @@ -31,7 +31,6 @@ from google.protobuf import any_pb2 from google.protobuf.message import DecodeError -from uprotocol.cloudevent.factory.cloudeventfactory import CloudEventFactory from uprotocol.proto.ustatus_pb2 import UCode from uprotocol.proto.uattributes_pb2 import UMessageType, UPriority, UAttributes from uprotocol.uri.serializer.longuriserializer import LongUriSerializer @@ -39,6 +38,7 @@ from uprotocol.uuid.serializer.longuuidserializer import LongUuidSerializer from uprotocol.proto.upayload_pb2 import UPayloadFormat, UPayload from uprotocol.proto.umessage_pb2 import UMessage +from uprotocol.proto.uuid_pb2 import UUID class UCloudEvent: @@ -178,8 +178,11 @@ def get_communication_status(ce: CloudEvent) -> int: @return: Returns a {@link UCode} value that indicates of a platform communication error while delivering this CloudEvent or UCode.OK_VALUE. """ - comm_status = UCloudEvent.extract_string_value_from_attributes("commstatus", ce) - return int(comm_status) if comm_status is not None else UCode.OK + try: + comm_status = UCloudEvent.extract_string_value_from_attributes("commstatus", ce) + return int(comm_status) if comm_status is not None else UCode.OK + except: + return UCode.OK @staticmethod def has_communication_status_problem(ce: CloudEvent) -> bool: @@ -188,7 +191,7 @@ def has_communication_status_problem(ce: CloudEvent) -> bool: @param ce:CloudEvent to be queried for a platform delivery error. @return:returns true if the provided CloudEvent is marked with having a platform delivery problem. """ - return UCloudEvent.get_communication_status(ce) != 0 + return UCloudEvent.get_communication_status(ce) != UCode.OK @staticmethod def add_communication_status(ce: CloudEvent, communication_status) -> CloudEvent: @@ -252,9 +255,9 @@ def is_expired(ce: CloudEvent) -> bool: try: uuid = LongUuidSerializer.instance().deserialize(cloud_event_id) - if uuid is None: + if uuid is None or uuid == UUID(): return False - delta = datetime.utcnow().timestamp() - UUIDUtils.getTime(uuid) + delta = int(round(time.time() * 1000)) - UUIDUtils.getTime(uuid) except ValueError: # Invalid UUID, handle accordingly delta = 0 @@ -407,28 +410,28 @@ def fromMessage(message: UMessage) -> CloudEvent: attributes = message.attributes data = bytearray() json_attributes = {"id": LongUuidSerializer.instance().serialize(attributes.id), - "source": LongUriSerializer.serialize(message.source), - "type": UCloudEvent.get_content_type_from_upayload_format(attributes.type)} + "source": LongUriSerializer().serialize(message.source), + "type": UCloudEvent.get_event_type(attributes.type)} contenttype = UCloudEvent.get_content_type_from_upayload_format(message.payload.format) if contenttype: - json_attributes['datacontenttype'] = CloudEventFactory.PROTOBUF_CONTENT_TYPE + json_attributes['datacontenttype'] = "application/x-protobuf" # IMPORTANT: Currently, ONLY the VALUE format is supported in the SDK! if message.payload.HasField('value'): - data = message.payload.value.SerializeToString() + data = message.payload.value if attributes.HasField('ttl'): json_attributes['ttl'] = attributes.ttl - if attributes.HasField('priority'): + if attributes.priority > 0: json_attributes['priority'] = UPriority.Name(attributes.priority) if attributes.HasField('token'): json_attributes['token'] = attributes.token if attributes.HasField('sink'): - json_attributes['sink'] = attributes.sink + json_attributes['sink'] = LongUriSerializer().serialize(attributes.sink) if attributes.HasField('commstatus'): json_attributes['commstatus'] = attributes.commstatus if attributes.HasField('reqid'): - json_attributes['reqid'] = attributes.reqid - if attributes.HasField('plevel'): + json_attributes['reqid'] = LongUuidSerializer.instance().serialize(attributes.reqid) + if attributes.HasField('permission_level'): json_attributes['plevel'] = attributes.permission_level cloud_event = CloudEvent(json_attributes, data) @@ -445,22 +448,24 @@ def toMessage(event: CloudEvent) -> UMessage: if event is None: raise ValueError("Cloud Event can't be None") source = LongUriSerializer().deserialize(UCloudEvent.get_source(event)) - payload = UPayload(format=UCloudEvent.get_upayload_format_from_content_type(UCloudEvent.get_data_content_type(event)), - value=UCloudEvent.get_payload(event).SerializeToString()) - attributes= UAttributes(id=LongUuidSerializer.instance().serialize(UCloudEvent.get_id(event)), type=UCloudEvent.get_message_type(UCloudEvent.get_type(event))) + payload = UPayload( + format=UCloudEvent.get_upayload_format_from_content_type(UCloudEvent.get_data_content_type(event)), + value=UCloudEvent.get_payload(event).SerializeToString()) + attributes = UAttributes(id=LongUuidSerializer.instance().deserialize(UCloudEvent.get_id(event)), + type=UCloudEvent.get_message_type(UCloudEvent.get_type(event))) if UCloudEvent.has_communication_status_problem(event): attributes.commstatus = UCloudEvent.get_communication_status(event) - priority= UCloudEvent.get_priority(event) + priority = UCloudEvent.get_priority(event) if priority is not None: - attributes.priority=priority + attributes.priority = priority - sink=UCloudEvent.get_sink(event) + sink = UCloudEvent.get_sink(event) if sink is not None: attributes.sink.CopyFrom(LongUriSerializer().deserialize(sink)) reqid = UCloudEvent.get_request_id(event) if reqid is not None: - attributes.reqid = reqid + attributes.reqid.CopyFrom(LongUuidSerializer().deserialize(reqid)) ttl = UCloudEvent.get_ttl(event) if ttl is not None: @@ -474,7 +479,4 @@ def toMessage(event: CloudEvent) -> UMessage: if plevel is not None: attributes.permission_level = plevel - return UMessage(attributes=attributes,payload=payload, source=source) - - - + return UMessage(attributes=attributes, payload=payload, source=source) diff --git a/uprotocol/proto/umessage_pb2.py b/uprotocol/proto/umessage_pb2.py index 7d85d97..f183abe 100644 --- a/uprotocol/proto/umessage_pb2.py +++ b/uprotocol/proto/umessage_pb2.py @@ -11,9 +11,9 @@ _sym_db = _symbol_database.Default() -import uattributes_pb2 as uattributes__pb2 -import upayload_pb2 as upayload__pb2 -import uri_pb2 as uri__pb2 +import uprotocol.proto.uattributes_pb2 as uattributes__pb2 +import uprotocol.proto.upayload_pb2 as upayload__pb2 +import uprotocol.proto.uri_pb2 as uri__pb2 DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0eumessage.proto\x12\x0cuprotocol.v1\x1a\x11uattributes.proto\x1a\x0eupayload.proto\x1a\turi.proto\"\x86\x01\n\x08UMessage\x12\"\n\x06source\x18\x01 \x01(\x0b\x32\x12.uprotocol.v1.UUri\x12-\n\nattributes\x18\x02 \x01(\x0b\x32\x19.uprotocol.v1.UAttributes\x12\'\n\x07payload\x18\x03 \x01(\x0b\x32\x16.uprotocol.v1.UPayloadB+\n\x18org.eclipse.uprotocol.v1B\rUMessageProtoP\x01\x62\x06proto3') From 11d173e910aad47f07a06a91bf1ced987bd6d83c Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Fri, 15 Dec 2023 15:08:33 -0500 Subject: [PATCH 3/3] Minor bug fix --- .../test_datamodel/test_ucloudevent.py | 33 +++++++++++++------ 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/tests/test_cloudevent/test_datamodel/test_ucloudevent.py b/tests/test_cloudevent/test_datamodel/test_ucloudevent.py index d5a4821..29201c5 100644 --- a/tests/test_cloudevent/test_datamodel/test_ucloudevent.py +++ b/tests/test_cloudevent/test_datamodel/test_ucloudevent.py @@ -1,12 +1,5 @@ -import time -import unittest - -from google.protobuf import any_pb2 - -from uprotocol.cloudevent.datamodel.ucloudeventattributes import UCloudEventAttributesBuilder -from uprotocol.cloudevent.factory.ucloudevent import UCloudEvent # ------------------------------------------------------------------------- - +# # Copyright (c) 2023 General Motors GTO LLC # # Licensed to the Apache Software Foundation (ASF) under one @@ -28,7 +21,7 @@ # SPDX-FileType: SOURCE # SPDX-FileCopyrightText: 2023 General Motors GTO LLC # SPDX-License-Identifier: Apache-2.0 - +# # ------------------------------------------------------------------------- from uprotocol.proto.uri_pb2 import UUri, UEntity, UResource @@ -39,7 +32,13 @@ from uprotocol.proto.ustatus_pb2 import UCode from uprotocol.uuid.factory.uuidfactory import Factories from uprotocol.uuid.serializer.longuuidserializer import LongUuidSerializer +import time +import unittest +from google.protobuf import any_pb2 + +from uprotocol.cloudevent.datamodel.ucloudeventattributes import UCloudEventAttributesBuilder +from uprotocol.cloudevent.factory.ucloudevent import UCloudEvent def build_uri_for_test(): uri = UUri(entity=UEntity(name="body.access"), @@ -142,7 +141,7 @@ def test_extract_token_from_cloudevent_when_token_does_not_exists(self): def test_cloudevent_has_platform_error_when_platform_error_exists(self): cloud_event = build_cloud_event_for_test() - cloud_event.__setitem__("commstatus", UCode.ABORTED_VALUE) + cloud_event.__setitem__("commstatus", UCode.ABORTED) self.assertEquals(10, UCloudEvent.get_communication_status(cloud_event)) def test_cloudevent_has_platform_error_when_platform_error_does_not_exist(self): @@ -212,6 +211,12 @@ def test_from_message_with_valid_message(self): cloud_event1 = UCloudEvent.fromMessage(u_message) self.assertIsNotNone(cloud_event1) self.assertEquals(cloud_event, cloud_event1) + self.assertEquals(cloud_event.get_data(), cloud_event1.get_data()) + self.assertEquals(UCloudEvent.get_source(cloud_event), UCloudEvent.get_source(cloud_event1)) + self.assertEquals(UCloudEvent.get_specversion(cloud_event), UCloudEvent.get_specversion(cloud_event1)) + self.assertEquals(UCloudEvent.get_priority(cloud_event), UCloudEvent.get_priority(cloud_event1)) + self.assertEquals(UCloudEvent.get_id(cloud_event), UCloudEvent.get_id(cloud_event1)) + self.assertEquals(UCloudEvent.get_type(cloud_event), UCloudEvent.get_type(cloud_event1)) def test_to_from_message_from_request_cloudevent(self): # additional attributes @@ -236,6 +241,14 @@ def test_to_from_message_from_request_cloudevent(self): cloud_event1 = UCloudEvent.fromMessage(result) self.assertEquals(cloud_event, cloud_event1) + self.assertEquals(cloud_event.get_data(), cloud_event1.get_data()) + self.assertEquals(UCloudEvent.get_source(cloud_event), UCloudEvent.get_source(cloud_event1)) + self.assertEquals(UCloudEvent.get_sink(cloud_event), UCloudEvent.get_sink(cloud_event1)) + self.assertEquals(UCloudEvent.get_specversion(cloud_event), UCloudEvent.get_specversion(cloud_event1)) + self.assertEquals(UCloudEvent.get_priority(cloud_event), UCloudEvent.get_priority(cloud_event1)) + self.assertEquals(UCloudEvent.get_id(cloud_event), UCloudEvent.get_id(cloud_event1)) + self.assertEquals(UCloudEvent.get_type(cloud_event), UCloudEvent.get_type(cloud_event1)) + self.assertEquals(UCloudEvent.get_request_id(cloud_event), UCloudEvent.get_request_id(cloud_event1)) def test_to_from_message_from_request_cloudevent_without_attributes(self): # additional attributes