From 3af1e607e76f99e22d1fa1816662caa2f3f61533 Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Fri, 1 Oct 2021 19:39:00 -0700 Subject: [PATCH 01/12] initial --- .../serializer/avroserializer/aio/__init__.py | 30 +++ .../_schema_registry_avro_serializer_async.py | 173 ++++++++++++++++++ .../async_samples/avro_serializer_async.py | 93 ++++++++++ .../eventhub_receive_integration.py | 69 +++++++ .../eventhub_send_integration.py | 80 ++++++++ .../{ => sync_samples}/avro_serializer.py | 0 .../eventhub_receive_integration.py | 0 .../eventhub_send_integration.py | 0 .../tests/test_avro_serializer_async.py | 88 +++++++++ 9 files changed, 533 insertions(+) create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/__init__.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py rename sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/{ => sync_samples}/avro_serializer.py (100%) rename sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/{ => sync_samples}/eventhub_receive_integration.py (100%) rename sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/{ => sync_samples}/eventhub_send_integration.py (100%) create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/__init__.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/__init__.py new file mode 100644 index 000000000000..14d743e38828 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/__init__.py @@ -0,0 +1,30 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +from ._schema_registry_avro_serializer_async import AvroSerializer + +__all__ = [ + "AvroSerializer" +] diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py new file mode 100644 index 000000000000..48905626a6ca --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py @@ -0,0 +1,173 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +try: + from functools import lru_cache +except ImportError: + from backports.functools_lru_cache import lru_cache +from io import BytesIO +from typing import Any, Dict, Mapping +import avro + +from .._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX +from .._avro_serializer import AvroObjectSerializer + + +class AvroSerializer(object): + """ + AvroSerializer provides the ability to serialize and deserialize data according + to the given avro schema. It would automatically register, get and cache the schema. + + :keyword client: Required. The schema registry client + which is used to register schema and retrieve schema from the service. + :paramtype client: ~azure.schemaregistry.SchemaRegistryClient + :keyword str group_name: Required. Schema group under which schema should be registered. + :keyword bool auto_register_schemas: When true, register new schemas passed to serialize. + Otherwise, and by default, fail if it has not been pre-registered in the registry. + + """ + + def __init__(self, **kwargs): + # type: (Any) -> None + try: + self._schema_group = kwargs.pop("group_name") + self._schema_registry_client = kwargs.pop("client") # type: "SchemaRegistryClient" + except KeyError as e: + raise TypeError("'{}' is a required keyword.".format(e.args[0])) + self._avro_serializer = AvroObjectSerializer(codec=kwargs.get("codec")) + self._auto_register_schemas = kwargs.get("auto_register_schemas", False) + self._auto_register_schema_func = ( + self._schema_registry_client.register_schema + if self._auto_register_schemas + else self._schema_registry_client.get_schema_id + ) + + async def __aenter__(self): + # type: () -> SchemaRegistryAvroSerializer + await self._schema_registry_client.__aenter__() + return self + + async def __aexit__(self, *exc_details): + # type: (Any) -> None + await self._schema_registry_client.__exit__(*exc_details) + + async def close(self): + # type: () -> None + """This method is to close the sockets opened by the client. + It need not be used when using with a context manager. + """ + await self._schema_registry_client.close() + + #@lru_cache(maxsize=128) + async def _get_schema_id(self, schema_name, schema_str, **kwargs): + # type: (str, str, Any) -> str + """ + Get schema id from local cache with the given schema. + If there is no item in the local cache, get schema id from the service and cache it. + + :param schema_name: Name of the schema + :type schema_name: str + :param schema: Schema object + :type schema: avro.schema.Schema + :return: Schema Id + :rtype: str + """ + schema_properties = await self._auto_register_schema_func( + self._schema_group, schema_name, "Avro", schema_str, **kwargs + ) + return schema_properties.schema_id + + #@lru_cache(maxsize=128) + async def _get_schema(self, schema_id, **kwargs): + # type: (str, Any) -> str + """ + Get schema content from local cache with the given schema id. + If there is no item in the local cache, get schema from the service and cache it. + + :param str schema_id: Schema id + :return: Schema content + """ + schema = await self._schema_registry_client.get_schema( + schema_id, **kwargs + ) + return schema.schema_content + + @classmethod + @lru_cache(maxsize=128) + def _parse_schema(cls, schema): + return avro.schema.parse(schema) + + async def serialize(self, value, **kwargs): + # type: (Mapping[str, Any], Any) -> bytes + """ + Encode data with the given schema. The returns bytes are consisted of: The first 4 bytes + denoting record format identifier. The following 32 bytes denoting schema id returned by schema registry + service. The remaining bytes are the real data payload. + + :param value: The data to be encoded. + :type value: Mapping[str, Any] + :keyword schema: Required. The schema used to encode the data. + :paramtype schema: str + :rtype: bytes + """ + try: + raw_input_schema = kwargs.pop("schema") + except KeyError as e: + raise TypeError("'{}' is a required keyword.".format(e.args[0])) + + cached_schema = AvroSerializer._parse_schema(raw_input_schema) + record_format_identifier = b"\0\0\0\0" + schema_id = await self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs) + data_bytes = self._avro_serializer.serialize(value, cached_schema) + + stream = BytesIO() + + stream.write(record_format_identifier) + stream.write(schema_id.encode("utf-8")) + stream.write(data_bytes) + stream.flush() + + payload = stream.getvalue() + stream.close() + return payload + + async def deserialize(self, value, **kwargs): + # type: (bytes, Any) -> Dict[str, Any] + """ + Decode bytes data. + + :param bytes value: The bytes data needs to be decoded. + :rtype: Dict[str, Any] + """ + # record_format_identifier = data[0:4] # The first 4 bytes are retained for future record format identifier. + schema_id = value[ + SCHEMA_ID_START_INDEX : (SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH) + ].decode("utf-8") + schema_content = await self._get_schema(schema_id, **kwargs) + + dict_value = self._avro_serializer.deserialize( + value[DATA_START_INDEX:], schema_content + ) + return dict_value diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py new file mode 100644 index 000000000000..555112c0bc9c --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py @@ -0,0 +1,93 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import os +import asyncio + +from azure.identity.aio import ClientSecretCredential +from azure.schemaregistry.aio import SchemaRegistryClient +from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer + +TENANT_ID=os.environ['AZURE_TENANT_ID'] +CLIENT_ID=os.environ['AZURE_CLIENT_ID'] +CLIENT_SECRET=os.environ['AZURE_CLIENT_SECRET'] + +SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE=os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE'] +GROUP_NAME=os.environ['SCHEMAREGISTRY_GROUP'] +SCHEMA_STRING = """ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +}""" + + +token_credential = ClientSecretCredential( + tenant_id=TENANT_ID, + client_id=CLIENT_ID, + client_secret=CLIENT_SECRET +) + + +async def serialize(serializer): + dict_data_ben = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} + dict_data_alice = {"name": u"Alice", "favorite_number": 15, "favorite_color": u"green"} + + # Schema would be automatically registered into Schema Registry and cached locally. + payload_ben = await serializer.serialize(dict_data_ben, schema=SCHEMA_STRING) + # The second call won't trigger a service call. + payload_alice = await serializer.serialize(dict_data_alice, schema=SCHEMA_STRING) + + print('Encoded bytes are: ', payload_ben) + print('Encoded bytes are: ', payload_alice) + return [payload_ben, payload_alice] + + +async def deserialize(serializer, bytes_payload): + # serializer.deserialize would extract the schema id from the payload, + # retrieve schema from Schema Registry and cache the schema locally. + # If the schema id is the local cache, the call won't trigger a service call. + dict_data = await serializer.deserialize(bytes_payload) + + print('Deserialized data is: ', dict_data) + return dict_data + + +async def main(): + schema_registry = SchemaRegistryClient(endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential) + serializer = AvroSerializer(client=schema_registry, group_name=GROUP_NAME, auto_register_schemas=True) + async with schema_registry: + bytes_data_ben, bytes_data_alice = await serialize(serializer) + dict_data_ben = await deserialize(serializer, bytes_data_ben) + dict_data_alice = await deserialize(serializer, bytes_data_alice) + await schema_registry.close() + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py new file mode 100644 index 000000000000..9521ce4a4228 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Examples to show receiving events from EventHub with AvroSerializer integrated for data deserialization. +""" + +# pylint: disable=C0111 +import os +import asyncio +from azure.eventhub.aio import EventHubConsumerClient +from azure.identity.aio import DefaultAzureCredential +from azure.schemaregistry.aio import SchemaRegistryClient +from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer + +EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] +EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] + +SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE'] +GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP'] + + +# create an EventHubConsumerClient instance +eventhub_consumer = EventHubConsumerClient.from_connection_string( + conn_str=EVENTHUB_CONNECTION_STR, + consumer_group='$Default', + eventhub_name=EVENTHUB_NAME, +) +# create a AvroSerializer instance +# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace' +avro_serializer = AvroSerializer( + client=SchemaRegistryClient( + endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, + credential=DefaultAzureCredential() + ), + group_name=GROUP_NAME, + auto_register_schemas=True +) + +async def on_event(partition_context, event): + print("Received event from partition: {}.".format(partition_context.partition_id)) + + bytes_payload = b"".join(b for b in event.body) + print('The received bytes of the EventData is {}.'.format(bytes_payload)) + + # Use the deserialize method to convert bytes to dict object. + # The deserialize method would extract the schema id from the payload, and automatically retrieve the Avro Schema + # from the Schema Registry Service. The schema would be cached locally for future usage. + deserialized_data = await avro_serializer.deserialize(bytes_payload) + print('The dict data after deserialization is {}'.format(deserialized_data)) + + +async def main(): + try: + async with eventhub_consumer, avro_serializer: + await eventhub_consumer.receive( + on_event=on_event, + starting_position="-1", # "-1" is from the beginning of the partition. + ) + except KeyboardInterrupt: + print('Stopped receiving.') + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py new file mode 100644 index 000000000000..220ad5dca3e9 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Examples to show sending event to EventHub with AvroSerializer integrated for data serialization. +""" + +# pylint: disable=C0111 + +import os +import asyncio +from azure.eventhub import EventData +from azure.eventhub.aio import EventHubProducerClient +from azure.identity.aio import DefaultAzureCredential +from azure.schemaregistry.aio import SchemaRegistryClient +from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer + +EVENTHUB_CONNECTION_STR = os.environ['EVENT_HUB_CONN_STR'] +EVENTHUB_NAME = os.environ['EVENT_HUB_NAME'] + +SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE = os.environ['SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE'] +GROUP_NAME = os.environ['SCHEMAREGISTRY_GROUP'] + +SCHEMA_STRING = """ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]} + ] +}""" + +# create an EventHubProducerClient instance +eventhub_producer = EventHubProducerClient.from_connection_string( + conn_str=EVENTHUB_CONNECTION_STR, + eventhub_name=EVENTHUB_NAME +) + +# create a AvroSerializer instance +# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace' +avro_serializer = AvroSerializer( + client=SchemaRegistryClient( + endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, + credential=DefaultAzureCredential() + ), + group_name=GROUP_NAME, + auto_register_schemas=True +) + +async def send_event_data_batch(producer, serializer): + event_data_batch = await producer.create_batch() + + dict_data = {"name": "Bob", "favorite_number": 7, "favorite_color": "red"} + # Use the serialize method to convert dict object to bytes with the given avro schema. + # The serialize method would automatically register the schema into the Schema Registry Service and + # schema would be cached locally for future usage. + payload_bytes = await serializer.serialize(value=dict_data, schema=SCHEMA_STRING) + print('The bytes of serialized dict data is {}.'.format(payload_bytes)) + + event_data = EventData(body=payload_bytes) # pass the bytes data to the body of an EventData + event_data_batch.add(event_data) + await producer.send_batch(event_data_batch) + print('Send is done.') + + +async def main(): + + async with eventhub_producer: + await send_event_data_batch(eventhub_producer, avro_serializer) + await avro_serializer.close() + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/avro_serializer.py similarity index 100% rename from sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/avro_serializer.py rename to sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/avro_serializer.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_receive_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_receive_integration.py similarity index 100% rename from sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_receive_integration.py rename to sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_receive_integration.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_send_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_send_integration.py similarity index 100% rename from sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_send_integration.py rename to sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_send_integration.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py new file mode 100644 index 000000000000..350ef13e4842 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py @@ -0,0 +1,88 @@ +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the ""Software""), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. +# +# -------------------------------------------------------------------------- +import functools +import pytest +import uuid +import avro +import avro.io +from io import BytesIO +import asyncio + +from azure.schemaregistry.aio import SchemaRegistryClient +from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer +from azure.schemaregistry.serializer.avroserializer._avro_serializer import AvroObjectSerializer +from azure.identity.aio import ClientSecretCredential +from azure.core.exceptions import ClientAuthenticationError, ServiceRequestError, HttpResponseError + +from devtools_testutils import AzureTestCase, PowerShellPreparer + +SchemaRegistryPowerShellPreparer = functools.partial(PowerShellPreparer, "schemaregistry", schemaregistry_fully_qualified_namespace="fake_resource.servicebus.windows.net/", schemaregistry_group="fakegroup") + +class AvroSerializerAsyncTests(AzureTestCase): + + @pytest.mark.asyncio + @SchemaRegistryPowerShellPreparer() + async def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs): + # TODO: AFTER RELEASING azure-schemaregistry=1.0.0b3, UPDATE 'endpoint' to 'fully_qualified_namespace' + credential = self.get_credential(SchemaRegistryClient, is_async=True) + sr_client = self.create_client_from_credential(SchemaRegistryClient, credential, endpoint=schemaregistry_fully_qualified_namespace, is_async=True) + sr_avro_serializer = AvroSerializer(client=sr_client, group_name=schemaregistry_group, auto_register_schemas=True) + + schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" + schema = avro.schema.parse(schema_str) + + dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} + encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) + + assert encoded_data[0:4] == b'\0\0\0\0' + schema_id = await sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id + assert encoded_data[4:36] == schema_id.encode("utf-8") + + decoded_data = await sr_avro_serializer.deserialize(encoded_data) + assert decoded_data["name"] == u"Ben" + assert decoded_data["favorite_number"] == 7 + assert decoded_data["favorite_color"] == u"red" + + await sr_avro_serializer.close() + + @pytest.mark.asyncio + @SchemaRegistryPowerShellPreparer() + async def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs): + # TODO: AFTER RELEASING azure-schemaregistry=1.0.0b3, UPDATE 'endpoint' to 'fully_qualified_namespace' + credential = self.get_credential(SchemaRegistryClient, is_async=True) + sr_client = self.create_client_from_credential(SchemaRegistryClient, credential, endpoint=schemaregistry_fully_qualified_namespace, is_async=True) + sr_avro_serializer = AvroSerializer(client=sr_client, group_name=schemaregistry_group) + + schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" + schema = avro.schema.parse(schema_str) + + dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} + encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) + + assert encoded_data[0:4] == b'\0\0\0\0' + schema_id = await sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id + assert encoded_data[4:36] == schema_id.encode("utf-8") + + decoded_data = await sr_avro_serializer.deserialize(encoded_data) + assert decoded_data["name"] == u"Ben" + assert decoded_data["favorite_number"] == 7 + assert decoded_data["favorite_color"] == u"red" + + await sr_avro_serializer.close() From 07e38044734d3173658e15e2223959f714f11b6d Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Fri, 1 Oct 2021 19:40:18 -0700 Subject: [PATCH 02/12] changelog --- .../azure-schemaregistry-avroserializer/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md b/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md index 40839e33cad9..7bd78d682ab0 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md @@ -6,6 +6,7 @@ - `auto_register_schemas` keyword argument has been added to `AvroSerializer`, which will allow for automatically registering schemas passed in to the `serialize`. - `value` parameter in `serialize` on `AvroSerializer` takes type `Mapping` rather than `Dict`. +- Async version of `SchemaRegistryAvroSerializer` has been added under `azure.schemaregistry.serializer.avroserializer.aio`. ### Breaking Changes From 6728ac2b08edf7348a5c9b730f7057aa097e1a4a Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Mon, 4 Oct 2021 11:42:44 -0700 Subject: [PATCH 03/12] fix failing tests --- .../samples/README.md | 6 +- .../async_samples/avro_serializer_async.py | 9 +-- .../eventhub_send_integration.py | 4 +- ...serializer_with_auto_register_schemas.yaml | 8 +-- ...ializer_without_auto_register_schemas.yaml | 8 +-- ...serializer_with_auto_register_schemas.yaml | 39 +++++++++++ ...ializer_without_auto_register_schemas.yaml | 39 +++++++++++ .../tests/test_avro_serializer_async.py | 68 +++++++++---------- 8 files changed, 130 insertions(+), 51 deletions(-) create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md index efb9eca6a478..817d74dfb2ff 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md @@ -50,7 +50,7 @@ Check out the [API reference documentation][api_reference] to learn more about what you can do with the Azure Schema Registry Avro Serializer library. -[avro_serializer_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/avro_serializer.py -[eventhub_send_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_send_integration.py -[eventhub_receive_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/eventhub_receive_integration.py +[avro_serializer_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/avro_serializer.py +[eventhub_send_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_send_integration.py +[eventhub_receive_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_receive_integration.py [api_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-schemaregistry-avroserializer/latest/index.html diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py index 555112c0bc9c..825b0002a6c4 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py @@ -82,11 +82,12 @@ async def deserialize(serializer, bytes_payload): async def main(): schema_registry = SchemaRegistryClient(endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential) serializer = AvroSerializer(client=schema_registry, group_name=GROUP_NAME, auto_register_schemas=True) - async with schema_registry: - bytes_data_ben, bytes_data_alice = await serialize(serializer) - dict_data_ben = await deserialize(serializer, bytes_data_ben) - dict_data_alice = await deserialize(serializer, bytes_data_alice) + bytes_data_ben, bytes_data_alice = await serialize(serializer) + dict_data_ben = await deserialize(serializer, bytes_data_ben) + dict_data_alice = await deserialize(serializer, bytes_data_alice) await schema_registry.close() + await serializer.close() + await token_credential.close() if __name__ == "__main__": loop = asyncio.get_event_loop() diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py index 220ad5dca3e9..06fc3bdb2fac 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py @@ -71,9 +71,9 @@ async def send_event_data_batch(producer, serializer): async def main(): - async with eventhub_producer: - await send_event_data_batch(eventhub_producer, avro_serializer) + await send_event_data_batch(eventhub_producer, avro_serializer) await avro_serializer.close() + await eventhub_producer.close() if __name__ == "__main__": loop = asyncio.get_event_loop() diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml index 0e70a2f2a6b4..488f4b45b36f 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -23,12 +23,12 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 response: body: - string: '{"id":"7b4eff1c25d9438a975ff7a3d985a5c6"}' + string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' headers: content-type: - application/json date: - - Fri, 01 Oct 2021 22:19:06 GMT + - Mon, 04 Oct 2021 18:41:59 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 server: @@ -38,9 +38,9 @@ interactions: transfer-encoding: - chunked x-schema-id: - - 7b4eff1c25d9438a975ff7a3d985a5c6 + - ad9a2f239f2b40e5b058f494bea52bca x-schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/7b4eff1c25d9438a975ff7a3d985a5c6?api-version=2017-04 + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2017-04 x-schema-type: - Avro x-schema-version: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml index ad11417efc03..a0bce1aef222 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -23,12 +23,12 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 response: body: - string: '{"id":"7b4eff1c25d9438a975ff7a3d985a5c6"}' + string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' headers: content-type: - application/json date: - - Fri, 01 Oct 2021 22:19:07 GMT + - Mon, 04 Oct 2021 18:42:00 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 server: @@ -38,9 +38,9 @@ interactions: transfer-encoding: - chunked x-schema-id: - - 7b4eff1c25d9438a975ff7a3d985a5c6 + - ad9a2f239f2b40e5b058f494bea52bca x-schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/7b4eff1c25d9438a975ff7a3d985a5c6?api-version=2017-04 + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2017-04 x-schema-type: - Avro x-schema-version: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml new file mode 100644 index 000000000000..807991aaa029 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -0,0 +1,39 @@ +interactions: +- request: + body: '"{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\", + \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", + \"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], + \"name\": \"favorite_color\"}]}"' + headers: + Accept: + - application/json + Content-Length: + - '265' + Content-Type: + - application/json + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b1 Python/3.9.0 (Windows-10-10.0.19041-SP0) + X-Schema-Type: + - Avro + method: PUT + uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 + response: + body: + string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + headers: + content-type: application/json + date: Mon, 04 Oct 2021 18:42:01 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + x-schema-id: ad9a2f239f2b40e5b058f494bea52bca + x-schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2017-04 + x-schema-type: Avro + x-schema-version: '1' + x-schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2017-04 + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2017-04 +version: 1 diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml new file mode 100644 index 000000000000..00e8dbb14dc5 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -0,0 +1,39 @@ +interactions: +- request: + body: '"{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\", + \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", + \"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], + \"name\": \"favorite_color\"}]}"' + headers: + Accept: + - application/json + Content-Length: + - '265' + Content-Type: + - application/json + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b1 Python/3.9.0 (Windows-10-10.0.19041-SP0) + X-Schema-Type: + - Avro + method: PUT + uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 + response: + body: + string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + headers: + content-type: application/json + date: Mon, 04 Oct 2021 18:42:02 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + x-schema-id: ad9a2f239f2b40e5b058f494bea52bca + x-schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2017-04 + x-schema-type: Avro + x-schema-version: '1' + x-schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2017-04 + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2017-04 +version: 1 diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py index 350ef13e4842..fd8294787f7a 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py @@ -24,10 +24,10 @@ import avro.io from io import BytesIO import asyncio +import pytest from azure.schemaregistry.aio import SchemaRegistryClient from azure.schemaregistry.serializer.avroserializer.aio import AvroSerializer -from azure.schemaregistry.serializer.avroserializer._avro_serializer import AvroObjectSerializer from azure.identity.aio import ClientSecretCredential from azure.core.exceptions import ClientAuthenticationError, ServiceRequestError, HttpResponseError @@ -37,52 +37,52 @@ class AvroSerializerAsyncTests(AzureTestCase): + def create_client(self, fully_qualified_namespace): + credential = self.get_credential(SchemaRegistryClient, is_async=True) + return self.create_client_from_credential(SchemaRegistryClient, credential, endpoint=fully_qualified_namespace, is_async=True) + @pytest.mark.asyncio @SchemaRegistryPowerShellPreparer() async def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs): - # TODO: AFTER RELEASING azure-schemaregistry=1.0.0b3, UPDATE 'endpoint' to 'fully_qualified_namespace' - credential = self.get_credential(SchemaRegistryClient, is_async=True) - sr_client = self.create_client_from_credential(SchemaRegistryClient, credential, endpoint=schemaregistry_fully_qualified_namespace, is_async=True) + sr_client = self.create_client(schemaregistry_fully_qualified_namespace) sr_avro_serializer = AvroSerializer(client=sr_client, group_name=schemaregistry_group, auto_register_schemas=True) - schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" - schema = avro.schema.parse(schema_str) + async with sr_client: + schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" + schema = avro.schema.parse(schema_str) - dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} - encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) + dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} + encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) - assert encoded_data[0:4] == b'\0\0\0\0' - schema_id = await sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id - assert encoded_data[4:36] == schema_id.encode("utf-8") + assert encoded_data[0:4] == b'\0\0\0\0' + schema_properties = await sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)) + schema_id = schema_properties.schema_id + assert encoded_data[4:36] == schema_id.encode("utf-8") - decoded_data = await sr_avro_serializer.deserialize(encoded_data) - assert decoded_data["name"] == u"Ben" - assert decoded_data["favorite_number"] == 7 - assert decoded_data["favorite_color"] == u"red" - - await sr_avro_serializer.close() + decoded_data = await sr_avro_serializer.deserialize(encoded_data) + assert decoded_data["name"] == u"Ben" + assert decoded_data["favorite_number"] == 7 + assert decoded_data["favorite_color"] == u"red" @pytest.mark.asyncio @SchemaRegistryPowerShellPreparer() async def test_basic_sr_avro_serializer_without_auto_register_schemas(self, schemaregistry_fully_qualified_namespace, schemaregistry_group, **kwargs): - # TODO: AFTER RELEASING azure-schemaregistry=1.0.0b3, UPDATE 'endpoint' to 'fully_qualified_namespace' - credential = self.get_credential(SchemaRegistryClient, is_async=True) - sr_client = self.create_client_from_credential(SchemaRegistryClient, credential, endpoint=schemaregistry_fully_qualified_namespace, is_async=True) - sr_avro_serializer = AvroSerializer(client=sr_client, group_name=schemaregistry_group) - - schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" - schema = avro.schema.parse(schema_str) + sr_client = self.create_client(schemaregistry_fully_qualified_namespace) + sr_avro_serializer = AvroSerializer(client=sr_client, group_name=schemaregistry_group, auto_register_schemas=True) - dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} - encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) + async with sr_client: + schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" + schema = avro.schema.parse(schema_str) - assert encoded_data[0:4] == b'\0\0\0\0' - schema_id = await sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)).schema_id - assert encoded_data[4:36] == schema_id.encode("utf-8") + dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} + encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) - decoded_data = await sr_avro_serializer.deserialize(encoded_data) - assert decoded_data["name"] == u"Ben" - assert decoded_data["favorite_number"] == 7 - assert decoded_data["favorite_color"] == u"red" + assert encoded_data[0:4] == b'\0\0\0\0' + schema_properties = await sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)) + schema_id = schema_properties.schema_id + assert encoded_data[4:36] == schema_id.encode("utf-8") - await sr_avro_serializer.close() + decoded_data = await sr_avro_serializer.deserialize(encoded_data) + assert decoded_data["name"] == u"Ben" + assert decoded_data["favorite_number"] == 7 + assert decoded_data["favorite_color"] == u"red" From 735a68180fb81406835212df45d750f0be80698d Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Thu, 14 Oct 2021 15:28:35 -0700 Subject: [PATCH 04/12] async lru --- .../avroserializer/aio/_async_lru.py | 223 ++++++++++++++++++ .../_schema_registry_avro_serializer_async.py | 5 +- .../dev_requirements.txt | 3 +- ...serializer_with_auto_register_schemas.yaml | 8 +- ...ializer_without_auto_register_schemas.yaml | 8 +- ...serializer_with_auto_register_schemas.yaml | 8 +- ...ializer_without_auto_register_schemas.yaml | 8 +- 7 files changed, 244 insertions(+), 19 deletions(-) create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py new file mode 100644 index 000000000000..946ebcf71db7 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py @@ -0,0 +1,223 @@ +# -------------------------------------------------------------------------- +# The MIT License +# +# Copyright (c) 2018 aio-libs team https://github.com/aio-libs/ +# Copyright (c) 2017 Ocean S. A. https://ocean.io/ +# Copyright (c) 2016-2017 WikiBusiness Corporation http://wikibusiness.org/ +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. +# -------------------------------------------------------------------------- +# Copying over `aio-libs/async_lru` for the following reasons: +# 1. There has not been an official release of `async_lru` in 2 years. +# 2. The last update to the library was a year ago, so it seems the library is +# not being actively maintained. + +import asyncio +from collections import OrderedDict +from functools import _CacheInfo, _make_key, partial, wraps + + +__version__ = "1.0.2" + +__all__ = ("alru_cache",) + + +def unpartial(fn): + while hasattr(fn, "func"): + fn = fn.func + + return fn + + +def _done_callback(fut, task): + if task.cancelled(): + fut.cancel() + return + + exc = task.exception() + if exc is not None: + fut.set_exception(exc) + return + + fut.set_result(task.result()) + + +def _cache_invalidate(wrapped, typed, *args, **kwargs): + key = _make_key(args, kwargs, typed) + + exists = key in wrapped._cache + + if exists: + wrapped._cache.pop(key) + + return exists + + +def _cache_clear(wrapped): + wrapped.hits = wrapped.misses = 0 + wrapped._cache = OrderedDict() + wrapped.tasks = set() + + +def _open(wrapped): + if not wrapped.closed: + raise RuntimeError("alru_cache is not closed") + + was_closed = ( + wrapped.hits == wrapped.misses == len(wrapped.tasks) == len(wrapped._cache) == 0 + ) + + if not was_closed: + raise RuntimeError("alru_cache was not closed correctly") + + wrapped.closed = False + + +def _close(wrapped, *, cancel=False, return_exceptions=True): + if wrapped.closed: + raise RuntimeError("alru_cache is closed") + + wrapped.closed = True + + if cancel: + for task in wrapped.tasks: + if not task.done(): # not sure is it possible + task.cancel() + + return _wait_closed(wrapped, return_exceptions=return_exceptions) + + +async def _wait_closed(wrapped, *, return_exceptions): + wait_closed = asyncio.gather(*wrapped.tasks, return_exceptions=return_exceptions) + + wait_closed.add_done_callback(partial(_close_waited, wrapped)) + + ret = await wait_closed + + # hack to get _close_waited callback to be executed + await asyncio.sleep(0) + + return ret + + +def _close_waited(wrapped, _): + wrapped.cache_clear() + + +def _cache_info(wrapped, maxsize): + return _CacheInfo( + wrapped.hits, + wrapped.misses, + maxsize, + len(wrapped._cache), + ) + + +def __cache_touch(wrapped, key): + try: + wrapped._cache.move_to_end(key) + except KeyError: # not sure is it possible + pass + + +def _cache_hit(wrapped, key): + wrapped.hits += 1 + __cache_touch(wrapped, key) + + +def _cache_miss(wrapped, key): + wrapped.misses += 1 + __cache_touch(wrapped, key) + + +def alru_cache( + fn=None, + maxsize=128, + typed=False, + *, + cache_exceptions=True, +): + def wrapper(fn): + _origin = unpartial(fn) + + if not asyncio.iscoroutinefunction(_origin): + raise RuntimeError("Coroutine function is required, got {}".format(fn)) + + # functools.partialmethod support + if hasattr(fn, "_make_unbound_method"): + fn = fn._make_unbound_method() + + @wraps(fn) + async def wrapped(*fn_args, **fn_kwargs): + if wrapped.closed: + raise RuntimeError("alru_cache is closed for {}".format(wrapped)) + + loop = asyncio.get_event_loop() + + key = _make_key(fn_args, fn_kwargs, typed) + + fut = wrapped._cache.get(key) + + if fut is not None: + if not fut.done(): + _cache_hit(wrapped, key) + return await asyncio.shield(fut) + + exc = fut._exception + + if exc is None or cache_exceptions: + _cache_hit(wrapped, key) + return fut.result() + + # exception here and cache_exceptions == False + wrapped._cache.pop(key) + + fut = loop.create_future() + task = loop.create_task(fn(*fn_args, **fn_kwargs)) + task.add_done_callback(partial(_done_callback, fut)) + + wrapped.tasks.add(task) + task.add_done_callback(wrapped.tasks.remove) + + wrapped._cache[key] = fut + + if maxsize is not None and len(wrapped._cache) > maxsize: + wrapped._cache.popitem(last=False) + + _cache_miss(wrapped, key) + return await asyncio.shield(fut) + + _cache_clear(wrapped) + wrapped._origin = _origin + wrapped.closed = False + wrapped.cache_info = partial(_cache_info, wrapped, maxsize) + wrapped.cache_clear = partial(_cache_clear, wrapped) + wrapped.invalidate = partial(_cache_invalidate, wrapped, typed) + wrapped.close = partial(_close, wrapped) + wrapped.open = partial(_open, wrapped) + + return wrapped + + if fn is None: + return wrapper + + if callable(fn) or hasattr(fn, "_make_unbound_method"): + return wrapper(fn) + + raise NotImplementedError("{} decorating is not supported".format(fn)) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py index 48905626a6ca..718bae298f9d 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py @@ -31,6 +31,7 @@ from typing import Any, Dict, Mapping import avro +from ._async_lru import alru_cache from .._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX from .._avro_serializer import AvroObjectSerializer @@ -80,7 +81,7 @@ async def close(self): """ await self._schema_registry_client.close() - #@lru_cache(maxsize=128) + @alru_cache(maxsize=128) async def _get_schema_id(self, schema_name, schema_str, **kwargs): # type: (str, str, Any) -> str """ @@ -99,7 +100,7 @@ async def _get_schema_id(self, schema_name, schema_str, **kwargs): ) return schema_properties.schema_id - #@lru_cache(maxsize=128) + @alru_cache(maxsize=128) async def _get_schema(self, schema_id, **kwargs): # type: (str, Any) -> str """ diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt b/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt index c2d5098aa720..df622180d27e 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt @@ -1,4 +1,5 @@ -e ../../../tools/azure-devtools -e ../../../tools/azure-sdk-tools -e ../../identity/azure-identity -../azure-schemaregistry \ No newline at end of file +../azure-schemaregistry +aiohttp>=3.0; python_version >= '3.5' \ No newline at end of file diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml index 488f4b45b36f..a97e0bfb5ed7 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -23,12 +23,12 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 response: body: - string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + string: '{"id":"3ad352630be04edea2d5db26d4f75eec"}' headers: content-type: - application/json date: - - Mon, 04 Oct 2021 18:41:59 GMT + - Thu, 14 Oct 2021 22:27:55 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 server: @@ -38,9 +38,9 @@ interactions: transfer-encoding: - chunked x-schema-id: - - ad9a2f239f2b40e5b058f494bea52bca + - 3ad352630be04edea2d5db26d4f75eec x-schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2017-04 + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2017-04 x-schema-type: - Avro x-schema-version: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml index a0bce1aef222..0dfb70c5df81 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -23,12 +23,12 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 response: body: - string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + string: '{"id":"3ad352630be04edea2d5db26d4f75eec"}' headers: content-type: - application/json date: - - Mon, 04 Oct 2021 18:42:00 GMT + - Thu, 14 Oct 2021 22:27:57 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 server: @@ -38,9 +38,9 @@ interactions: transfer-encoding: - chunked x-schema-id: - - ad9a2f239f2b40e5b058f494bea52bca + - 3ad352630be04edea2d5db26d4f75eec x-schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2017-04 + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2017-04 x-schema-type: - Avro x-schema-version: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml index 807991aaa029..4cf53f504332 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -19,16 +19,16 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 response: body: - string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + string: '{"id":"3ad352630be04edea2d5db26d4f75eec"}' headers: content-type: application/json - date: Mon, 04 Oct 2021 18:42:01 GMT + date: Thu, 14 Oct 2021 22:27:58 GMT location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 server: Microsoft-HTTPAPI/2.0 strict-transport-security: max-age=31536000 transfer-encoding: chunked - x-schema-id: ad9a2f239f2b40e5b058f494bea52bca - x-schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2017-04 + x-schema-id: 3ad352630be04edea2d5db26d4f75eec + x-schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2017-04 x-schema-type: Avro x-schema-version: '1' x-schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2017-04 diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml index 00e8dbb14dc5..518d053a4800 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -19,16 +19,16 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2017-04 response: body: - string: '{"id":"ad9a2f239f2b40e5b058f494bea52bca"}' + string: '{"id":"3ad352630be04edea2d5db26d4f75eec"}' headers: content-type: application/json - date: Mon, 04 Oct 2021 18:42:02 GMT + date: Thu, 14 Oct 2021 22:27:59 GMT location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2017-04 server: Microsoft-HTTPAPI/2.0 strict-transport-security: max-age=31536000 transfer-encoding: chunked - x-schema-id: ad9a2f239f2b40e5b058f494bea52bca - x-schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/ad9a2f239f2b40e5b058f494bea52bca?api-version=2017-04 + x-schema-id: 3ad352630be04edea2d5db26d4f75eec + x-schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2017-04 x-schema-type: Avro x-schema-version: '1' x-schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2017-04 From 7e2df3a0e72e25acb332302251dd4b6551ecd91e Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Mon, 18 Oct 2021 15:50:53 -0700 Subject: [PATCH 05/12] recordings --- .../avroserializer/_avro_serializer.py | 1 + .../_schema_registry_avro_serializer_async.py | 16 ++-- ...serializer_with_auto_register_schemas.yaml | 29 +++--- ...ializer_without_auto_register_schemas.yaml | 29 +++--- ...serializer_with_auto_register_schemas.yaml | 90 +++++++++++++++--- ...ializer_without_auto_register_schemas.yaml | 94 ++++++++++++++++--- .../tests/test_avro_serializer_async.py | 10 +- 7 files changed, 197 insertions(+), 72 deletions(-) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py index 0f40857e6e93..a12b966cdc7f 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py @@ -29,6 +29,7 @@ from backports.functools_lru_cache import lru_cache from typing import BinaryIO, Union, TypeVar from io import BytesIO +import json import avro from avro.io import DatumWriter, DatumReader, BinaryDecoder, BinaryEncoder diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py index 718bae298f9d..134361b211dc 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py @@ -62,7 +62,7 @@ def __init__(self, **kwargs): self._auto_register_schema_func = ( self._schema_registry_client.register_schema if self._auto_register_schemas - else self._schema_registry_client.get_schema_id + else self._schema_registry_client.get_schema_properties ) async def __aenter__(self): @@ -96,24 +96,24 @@ async def _get_schema_id(self, schema_name, schema_str, **kwargs): :rtype: str """ schema_properties = await self._auto_register_schema_func( - self._schema_group, schema_name, "Avro", schema_str, **kwargs + self._schema_group, schema_name, schema_str, "Avro", **kwargs ) - return schema_properties.schema_id + return schema_properties.id @alru_cache(maxsize=128) async def _get_schema(self, schema_id, **kwargs): # type: (str, Any) -> str """ - Get schema content from local cache with the given schema id. + Get schema definition from local cache with the given schema id. If there is no item in the local cache, get schema from the service and cache it. :param str schema_id: Schema id - :return: Schema content + :return: Schema definition """ schema = await self._schema_registry_client.get_schema( schema_id, **kwargs ) - return schema.schema_content + return schema.schema_definition @classmethod @lru_cache(maxsize=128) @@ -166,9 +166,9 @@ async def deserialize(self, value, **kwargs): schema_id = value[ SCHEMA_ID_START_INDEX : (SCHEMA_ID_START_INDEX + SCHEMA_ID_LENGTH) ].decode("utf-8") - schema_content = await self._get_schema(schema_id, **kwargs) + schema_definition = await self._get_schema(schema_id, **kwargs) dict_value = self._avro_serializer.deserialize( - value[DATA_START_INDEX:], schema_content + value[DATA_START_INDEX:], schema_definition ) return dict_value diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml index 4690e278cfbc..8a98a36f3121 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -22,18 +22,18 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"id":"3ad352630be04edea2d5db26d4f75eec"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: - application/json date: - - Mon, 18 Oct 2021 18:20:36 GMT + - Mon, 18 Oct 2021 22:49:08 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - 3ad352630be04edea2d5db26d4f75eec + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: @@ -72,18 +72,18 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"id":"3ad352630be04edea2d5db26d4f75eec"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: - application/json date: - - Mon, 18 Oct 2021 18:20:37 GMT + - Mon, 18 Oct 2021 22:49:08 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - 3ad352630be04edea2d5db26d4f75eec + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: @@ -111,24 +111,21 @@ interactions: User-Agent: - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) method: GET - uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/3ad352630be04edea2d5db26d4f75eec?api-version=2020-09-01-preview + uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview response: body: - string: '"{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\", - \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", - \"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], - \"name\": \"favorite_color\"}]}"' + string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' headers: content-type: - application/json date: - - Mon, 18 Oct 2021 18:20:37 GMT + - Mon, 18 Oct 2021 22:49:09 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - 3ad352630be04edea2d5db26d4f75eec + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml index 5d399d2cd26f..909d9e7a8a1d 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -22,18 +22,18 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"id":"3ad352630be04edea2d5db26d4f75eec"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: - application/json date: - - Mon, 18 Oct 2021 18:20:39 GMT + - Mon, 18 Oct 2021 22:49:11 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - 3ad352630be04edea2d5db26d4f75eec + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: @@ -72,18 +72,18 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"id":"3ad352630be04edea2d5db26d4f75eec"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: - application/json date: - - Mon, 18 Oct 2021 18:20:40 GMT + - Mon, 18 Oct 2021 22:49:11 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - 3ad352630be04edea2d5db26d4f75eec + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: @@ -111,24 +111,21 @@ interactions: User-Agent: - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) method: GET - uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/3ad352630be04edea2d5db26d4f75eec?api-version=2020-09-01-preview + uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview response: body: - string: '"{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\", - \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", - \"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], - \"name\": \"favorite_color\"}]}"' + string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' headers: content-type: - application/json date: - - Mon, 18 Oct 2021 18:20:40 GMT + - Mon, 18 Oct 2021 22:49:11 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: - - 3ad352630be04edea2d5db26d4f75eec + - a9619ab12fb748ab9ec800c13850107e schema-id-location: - - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/3ad352630be04edea2d5db26d4f75eec?api-version=2020-09-01-preview + - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview schema-version: - '1' schema-versions-location: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml index 6f85a80d68fb..e4ee468df71b 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -1,13 +1,13 @@ interactions: - request: body: '{"type": "record", "name": "User", "namespace": "example.avro", "fields": - [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, - {"type": ["string", "null"], "name": "favorite_color"}]}' + [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, + {"type": ["string", "null"], "name": "favorite_color"}]}' headers: Accept: - application/json Content-Length: - - '4' + - '221' Content-Type: - application/json Serialization-Type: @@ -18,21 +18,85 @@ interactions: uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"Code":400,"Detail":"Invalid schema type for PUT request. ''{\"type\": - \"record\", \"name\": \"user\", \"namespace\": \"example.avro\", \"fields\": - [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", \"null\"], - \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], \"name\": - \"favorite_color\"}]}'' is not supported. TrackingId:08e8c01c-43be-484a-b254-8818c1d64472_G1, - SystemTracker:swathip-test-eventhubs.servicebus.windows.net:$schemagroups\/fakegroup\/schemas\/example.avro.User, - Timestamp:2021-10-18T18:20:41"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: application/json - date: Mon, 18 Oct 2021 18:20:41 GMT + date: Mon, 18 Oct 2021 22:49:12 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro server: Microsoft-HTTPAPI/2.0 strict-transport-security: max-age=31536000 transfer-encoding: chunked status: - code: 400 - message: Bad Request + code: 200 + message: OK url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2020-09-01-preview +- request: + body: '{"type": "record", "name": "User", "namespace": "example.avro", "fields": + [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, + {"type": ["string", "null"], "name": "favorite_color"}]}' + headers: + Accept: + - application/json + Content-Length: + - '221' + Content-Type: + - application/json + Serialization-Type: + - Avro + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: POST + uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview + response: + body: + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' + headers: + content-type: application/json + date: Mon, 18 Oct 2021 22:49:12 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2020-09-01-preview +- request: + body: null + headers: + Accept: + - text/plain; charset=utf-8 + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: GET + uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + response: + body: + string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' + headers: + content-type: application/json + date: Mon, 18 Oct 2021 22:49:13 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview version: 1 diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml index 1527ad766be9..cf6d0ab32901 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -1,38 +1,102 @@ interactions: - request: - body: Avro + body: '{"type": "record", "name": "User", "namespace": "example.avro", "fields": + [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, + {"type": ["string", "null"], "name": "favorite_color"}]}' headers: Accept: - application/json Content-Length: - - '4' + - '221' Content-Type: - application/json Serialization-Type: - - '{"type": "record", "name": "User", "namespace": "example.avro", "fields": - [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, - {"type": ["string", "null"], "name": "favorite_color"}]}' + - Avro User-Agent: - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) method: PUT uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview response: body: - string: '{"Code":400,"Detail":"Invalid schema type for PUT request. ''{\"type\": - \"record\", \"name\": \"user\", \"namespace\": \"example.avro\", \"fields\": - [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", \"null\"], - \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], \"name\": - \"favorite_color\"}]}'' is not supported. TrackingId:79842f92-d637-44b2-af7f-4ca6cbc4bc3d_G28, - SystemTracker:swathip-test-eventhubs.servicebus.windows.net:$schemagroups\/fakegroup\/schemas\/example.avro.User, - Timestamp:2021-10-18T18:20:42"}' + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: application/json - date: Mon, 18 Oct 2021 18:20:42 GMT + date: Mon, 18 Oct 2021 22:49:14 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro server: Microsoft-HTTPAPI/2.0 strict-transport-security: max-age=31536000 transfer-encoding: chunked status: - code: 400 - message: Bad Request + code: 200 + message: OK url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2020-09-01-preview +- request: + body: '{"type": "record", "name": "User", "namespace": "example.avro", "fields": + [{"type": "string", "name": "name"}, {"type": ["int", "null"], "name": "favorite_number"}, + {"type": ["string", "null"], "name": "favorite_color"}]}' + headers: + Accept: + - application/json + Content-Length: + - '221' + Content-Type: + - application/json + Serialization-Type: + - Avro + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: POST + uri: https://fake_resource.servicebus.windows.net/$schemagroups/fakegroup/schemas/example.avro.User?api-version=2020-09-01-preview + response: + body: + string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' + headers: + content-type: application/json + date: Mon, 18 Oct 2021 22:49:15 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/swathip-test-schema/schemas/example.avro.User?api-version=2020-09-01-preview +- request: + body: null + headers: + Accept: + - text/plain; charset=utf-8 + User-Agent: + - azsdk-python-azureschemaregistry/1.0.0b3 Python/3.9.0 (Windows-10-10.0.19041-SP0) + method: GET + uri: https://fake_resource.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + response: + body: + string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' + headers: + content-type: application/json + date: Mon, 18 Oct 2021 22:49:15 GMT + location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview + schema-id: a9619ab12fb748ab9ec800c13850107e + schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview + schema-version: '1' + schema-versions-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/swathip-test-schema/schemas/example.avro.User/versions?api-version=2020-09-01-preview + serialization-type: Avro + server: Microsoft-HTTPAPI/2.0 + strict-transport-security: max-age=31536000 + transfer-encoding: chunked + status: + code: 200 + message: OK + url: https://swathip-test-eventhubs.servicebus.windows.net/$schemagroups/getSchemaById/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview version: 1 diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py index 24ffda234bc5..87601be2870a 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py @@ -49,14 +49,15 @@ async def test_basic_sr_avro_serializer_with_auto_register_schemas(self, schemar async with sr_client: schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" + schema_str = "{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\", \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", \"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], \"name\": \"favorite_color\"}]}" schema = avro.schema.parse(schema_str) dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) assert encoded_data[0:4] == b'\0\0\0\0' - schema_properties = await sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)) - schema_id = schema_properties.schema_id + schema_properties = await sr_client.get_schema_properties(schemaregistry_group, schema.fullname, str(schema), "Avro") + schema_id = schema_properties.id assert encoded_data[4:36] == schema_id.encode("utf-8") decoded_data = await sr_avro_serializer.deserialize(encoded_data) @@ -72,14 +73,15 @@ async def test_basic_sr_avro_serializer_without_auto_register_schemas(self, sche async with sr_client: schema_str = """{"namespace":"example.avro","type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"favorite_number","type":["int","null"]},{"name":"favorite_color","type":["string","null"]}]}""" + schema_str = "{\"type\": \"record\", \"name\": \"User\", \"namespace\": \"example.avro\", \"fields\": [{\"type\": \"string\", \"name\": \"name\"}, {\"type\": [\"int\", \"null\"], \"name\": \"favorite_number\"}, {\"type\": [\"string\", \"null\"], \"name\": \"favorite_color\"}]}" schema = avro.schema.parse(schema_str) dict_data = {"name": u"Ben", "favorite_number": 7, "favorite_color": u"red"} encoded_data = await sr_avro_serializer.serialize(dict_data, schema=schema_str) assert encoded_data[0:4] == b'\0\0\0\0' - schema_properties = await sr_client.get_schema_id(schemaregistry_group, schema.fullname, "Avro", str(schema)) - schema_id = schema_properties.schema_id + schema_properties = await sr_client.get_schema_properties(schemaregistry_group, schema.fullname, str(schema), "Avro") + schema_id = schema_properties.id assert encoded_data[4:36] == schema_id.encode("utf-8") decoded_data = await sr_avro_serializer.deserialize(encoded_data) From 9838d03754dc1826e85916ae623d2abd4ad3ae14 Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Mon, 18 Oct 2021 16:00:10 -0700 Subject: [PATCH 06/12] samples --- .../serializer/avroserializer/_avro_serializer.py | 1 - .../samples/async_samples/avro_serializer_async.py | 2 +- .../samples/async_samples/eventhub_receive_integration.py | 3 +-- .../samples/async_samples/eventhub_send_integration.py | 3 +-- 4 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py index a12b966cdc7f..0f40857e6e93 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_avro_serializer.py @@ -29,7 +29,6 @@ from backports.functools_lru_cache import lru_cache from typing import BinaryIO, Union, TypeVar from io import BytesIO -import json import avro from avro.io import DatumWriter, DatumReader, BinaryDecoder, BinaryEncoder diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py index 825b0002a6c4..77e433de55ae 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py @@ -80,7 +80,7 @@ async def deserialize(serializer, bytes_payload): async def main(): - schema_registry = SchemaRegistryClient(endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential) + schema_registry = SchemaRegistryClient(fully_qualified_namespace=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=token_credential) serializer = AvroSerializer(client=schema_registry, group_name=GROUP_NAME, auto_register_schemas=True) bytes_data_ben, bytes_data_alice = await serialize(serializer) dict_data_ben = await deserialize(serializer, bytes_data_ben) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py index 9521ce4a4228..be564ac144ed 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py @@ -31,10 +31,9 @@ eventhub_name=EVENTHUB_NAME, ) # create a AvroSerializer instance -# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace' avro_serializer = AvroSerializer( client=SchemaRegistryClient( - endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, + fully_qualified_namespace=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=DefaultAzureCredential() ), group_name=GROUP_NAME, diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py index 06fc3bdb2fac..1f01a7b554f7 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py @@ -43,10 +43,9 @@ ) # create a AvroSerializer instance -# TODO: after 'azure-schemaregistry==1.0.0b3' is released, update 'endpoint' to 'fully_qualified_namespace' avro_serializer = AvroSerializer( client=SchemaRegistryClient( - endpoint=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, + fully_qualified_namespace=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, credential=DefaultAzureCredential() ), group_name=GROUP_NAME, From daeafd91b02527bba547462669d49e696b373633 Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Tue, 19 Oct 2021 10:19:17 -0700 Subject: [PATCH 07/12] pylint --- .../serializer/avroserializer/aio/_async_lru.py | 6 ++++++ .../tests/test_avro_serializer_async.py | 1 - 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py index 946ebcf71db7..e9ad9703ddac 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py @@ -59,6 +59,7 @@ def _done_callback(fut, task): def _cache_invalidate(wrapped, typed, *args, **kwargs): + # pylint: disable=protected-access key = _make_key(args, kwargs, typed) exists = key in wrapped._cache @@ -70,6 +71,7 @@ def _cache_invalidate(wrapped, typed, *args, **kwargs): def _cache_clear(wrapped): + # pylint: disable=protected-access wrapped.hits = wrapped.misses = 0 wrapped._cache = OrderedDict() wrapped.tasks = set() @@ -79,6 +81,7 @@ def _open(wrapped): if not wrapped.closed: raise RuntimeError("alru_cache is not closed") + # pylint: disable=protected-access was_closed = ( wrapped.hits == wrapped.misses == len(wrapped.tasks) == len(wrapped._cache) == 0 ) @@ -121,6 +124,7 @@ def _close_waited(wrapped, _): def _cache_info(wrapped, maxsize): + # pylint: disable=protected-access return _CacheInfo( wrapped.hits, wrapped.misses, @@ -130,6 +134,7 @@ def _cache_info(wrapped, maxsize): def __cache_touch(wrapped, key): + # pylint: disable=protected-access try: wrapped._cache.move_to_end(key) except KeyError: # not sure is it possible @@ -154,6 +159,7 @@ def alru_cache( cache_exceptions=True, ): def wrapper(fn): + # pylint: disable=protected-access _origin = unpartial(fn) if not asyncio.iscoroutinefunction(_origin): diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py index 87601be2870a..9d79e5f94395 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py @@ -23,7 +23,6 @@ import avro import avro.io from io import BytesIO -import asyncio import pytest from azure.schemaregistry.aio import SchemaRegistryClient From 64df20626dce151aa024f68c2202651db865fbb2 Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Wed, 20 Oct 2021 12:26:51 -0700 Subject: [PATCH 08/12] adams comments --- .../CHANGELOG.md | 2 +- .../_schema_registry_avro_serializer.py | 13 +++------- .../serializer/avroserializer/_utils.py | 13 ++++++++++ .../avroserializer/aio/_async_lru.py | 3 ++- .../_schema_registry_avro_serializer_async.py | 25 ++++++------------- .../dev_requirements.txt | 1 - .../async_samples/avro_serializer_async.py | 1 - .../eventhub_receive_integration.py | 7 +++++- .../eventhub_send_integration.py | 5 ++-- .../tests/conftest.py | 16 ++++++++++++ ...serializer_with_auto_register_schemas.yaml | 6 ++--- ...ializer_without_auto_register_schemas.yaml | 6 ++--- ...serializer_with_auto_register_schemas.yaml | 6 ++--- ...ializer_without_auto_register_schemas.yaml | 6 ++--- 14 files changed, 65 insertions(+), 45 deletions(-) create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_utils.py create mode 100644 sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md b/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md index f94a6e84527a..ea56671d39ce 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/CHANGELOG.md @@ -4,7 +4,7 @@ ### Features Added -- Async version of `SchemaRegistryAvroSerializer` has been added under `azure.schemaregistry.serializer.avroserializer.aio`. +- Async version of `AvroSerializer` has been added under `azure.schemaregistry.serializer.avroserializer.aio`. ### Breaking Changes diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py index 58613a1f9a52..c923adad8eab 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py @@ -33,6 +33,7 @@ from ._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX from ._avro_serializer import AvroObjectSerializer +from ._utils import parse_schema class AvroSerializer(object): @@ -45,7 +46,7 @@ class AvroSerializer(object): :paramtype client: ~azure.schemaregistry.SchemaRegistryClient :keyword str group_name: Required. Schema group under which schema should be registered. :keyword bool auto_register_schemas: When true, register new schemas passed to serialize. - Otherwise, and by default, fail if it has not been pre-registered in the registry. + Otherwise, and by default, serialization will fail if the schema has not been pre-registered in the registry. """ @@ -89,8 +90,7 @@ def _get_schema_id(self, schema_name, schema_str, **kwargs): :param schema_name: Name of the schema :type schema_name: str - :param schema: Schema object - :type schema: avro.schema.Schema + :param str schema_str: Schema string :return: Schema Id :rtype: str """ @@ -114,11 +114,6 @@ def _get_schema(self, schema_id, **kwargs): ).schema_definition return schema_str - @classmethod - @lru_cache(maxsize=128) - def _parse_schema(cls, schema): - return avro.schema.parse(schema) - def serialize(self, value, **kwargs): # type: (Mapping[str, Any], Any) -> bytes """ @@ -137,7 +132,7 @@ def serialize(self, value, **kwargs): except KeyError as e: raise TypeError("'{}' is a required keyword.".format(e.args[0])) - cached_schema = AvroSerializer._parse_schema(raw_input_schema) + cached_schema = parse_schema(raw_input_schema) record_format_identifier = b"\0\0\0\0" schema_id = self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs) data_bytes = self._avro_serializer.serialize(value, cached_schema) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_utils.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_utils.py new file mode 100644 index 000000000000..6a89471e996b --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_utils.py @@ -0,0 +1,13 @@ +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- +try: + from functools import lru_cache +except ImportError: + from backports.functools_lru_cache import lru_cache +import avro + +@lru_cache(maxsize=128) +def parse_schema(schema): + return avro.schema.parse(schema) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py index e9ad9703ddac..c51dfdf0c9dd 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_async_lru.py @@ -23,7 +23,8 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # -------------------------------------------------------------------------- -# Copying over `aio-libs/async_lru` for the following reasons: +# Copying over `async_lru.py`[https://github.com/aio-libs/async-lru/blob/master/async_lru.py] +# from `aio-libs`[https://github.com/aio-libs/async-lru] for the following reasons: # 1. There has not been an official release of `async_lru` in 2 years. # 2. The last update to the library was a year ago, so it seems the library is # not being actively maintained. diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py index 134361b211dc..c1c3c17fbacf 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py @@ -23,10 +23,6 @@ # IN THE SOFTWARE. # # -------------------------------------------------------------------------- -try: - from functools import lru_cache -except ImportError: - from backports.functools_lru_cache import lru_cache from io import BytesIO from typing import Any, Dict, Mapping import avro @@ -34,6 +30,7 @@ from ._async_lru import alru_cache from .._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX from .._avro_serializer import AvroObjectSerializer +from .._utils import parse_schema class AvroSerializer(object): @@ -43,10 +40,10 @@ class AvroSerializer(object): :keyword client: Required. The schema registry client which is used to register schema and retrieve schema from the service. - :paramtype client: ~azure.schemaregistry.SchemaRegistryClient + :paramtype client: ~azure.schemaregistry.aio.SchemaRegistryClient :keyword str group_name: Required. Schema group under which schema should be registered. :keyword bool auto_register_schemas: When true, register new schemas passed to serialize. - Otherwise, and by default, fail if it has not been pre-registered in the registry. + Otherwise, and by default, serialization will fail if the schema has not been pre-registered in the registry. """ @@ -72,7 +69,7 @@ async def __aenter__(self): async def __aexit__(self, *exc_details): # type: (Any) -> None - await self._schema_registry_client.__exit__(*exc_details) + await self._schema_registry_client.__aexit__(*exc_details) async def close(self): # type: () -> None @@ -81,7 +78,7 @@ async def close(self): """ await self._schema_registry_client.close() - @alru_cache(maxsize=128) + @alru_cache(maxsize=128, cache_exceptions=False) async def _get_schema_id(self, schema_name, schema_str, **kwargs): # type: (str, str, Any) -> str """ @@ -90,8 +87,7 @@ async def _get_schema_id(self, schema_name, schema_str, **kwargs): :param schema_name: Name of the schema :type schema_name: str - :param schema: Schema object - :type schema: avro.schema.Schema + :param str schema_str: Schema string :return: Schema Id :rtype: str """ @@ -100,7 +96,7 @@ async def _get_schema_id(self, schema_name, schema_str, **kwargs): ) return schema_properties.id - @alru_cache(maxsize=128) + @alru_cache(maxsize=128, cache_exceptions=False) async def _get_schema(self, schema_id, **kwargs): # type: (str, Any) -> str """ @@ -115,11 +111,6 @@ async def _get_schema(self, schema_id, **kwargs): ) return schema.schema_definition - @classmethod - @lru_cache(maxsize=128) - def _parse_schema(cls, schema): - return avro.schema.parse(schema) - async def serialize(self, value, **kwargs): # type: (Mapping[str, Any], Any) -> bytes """ @@ -138,7 +129,7 @@ async def serialize(self, value, **kwargs): except KeyError as e: raise TypeError("'{}' is a required keyword.".format(e.args[0])) - cached_schema = AvroSerializer._parse_schema(raw_input_schema) + cached_schema = parse_schema(raw_input_schema) record_format_identifier = b"\0\0\0\0" schema_id = await self._get_schema_id(cached_schema.fullname, str(cached_schema), **kwargs) data_bytes = self._avro_serializer.serialize(value, cached_schema) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt b/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt index df622180d27e..d53cebff6287 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/dev_requirements.txt @@ -1,5 +1,4 @@ -e ../../../tools/azure-devtools -e ../../../tools/azure-sdk-tools -e ../../identity/azure-identity -../azure-schemaregistry aiohttp>=3.0; python_version >= '3.5' \ No newline at end of file diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py index 77e433de55ae..20595010e117 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py @@ -85,7 +85,6 @@ async def main(): bytes_data_ben, bytes_data_alice = await serialize(serializer) dict_data_ben = await deserialize(serializer, bytes_data_ben) dict_data_alice = await deserialize(serializer, bytes_data_alice) - await schema_registry.close() await serializer.close() await token_credential.close() diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py index be564ac144ed..df9b21482378 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py @@ -31,10 +31,11 @@ eventhub_name=EVENTHUB_NAME, ) # create a AvroSerializer instance +azure_credential = DefaultAzureCredential() avro_serializer = AvroSerializer( client=SchemaRegistryClient( fully_qualified_namespace=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, - credential=DefaultAzureCredential() + credential=azure_credential ), group_name=GROUP_NAME, auto_register_schemas=True @@ -62,6 +63,10 @@ async def main(): ) except KeyboardInterrupt: print('Stopped receiving.') + finally: + await avro_serializer.close() + await azure_credential.close() + await eventhub_consumer.close() if __name__ == '__main__': loop = asyncio.get_event_loop() diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py index 1f01a7b554f7..4d1dfe687d7a 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py @@ -41,12 +41,12 @@ conn_str=EVENTHUB_CONNECTION_STR, eventhub_name=EVENTHUB_NAME ) - # create a AvroSerializer instance +azure_credential = DefaultAzureCredential() avro_serializer = AvroSerializer( client=SchemaRegistryClient( fully_qualified_namespace=SCHEMAREGISTRY_FULLY_QUALIFIED_NAMESPACE, - credential=DefaultAzureCredential() + credential=azure_credential ), group_name=GROUP_NAME, auto_register_schemas=True @@ -72,6 +72,7 @@ async def main(): await send_event_data_batch(eventhub_producer, avro_serializer) await avro_serializer.close() + await azure_credential.close() await eventhub_producer.close() if __name__ == "__main__": diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py new file mode 100644 index 000000000000..3cf8a1feed70 --- /dev/null +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py @@ -0,0 +1,16 @@ +# ------------------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# ------------------------------------------------------------------------- + +import os +import sys +import uuid + +import pytest + +# Ignore async tests for Python < 3.5 +collect_ignore = [] +if sys.version_info < (3, 5): + collect_ignore.append("*_async.py") diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml index 8a98a36f3121..13f51e4c25c6 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -27,7 +27,7 @@ interactions: content-type: - application/json date: - - Mon, 18 Oct 2021 22:49:08 GMT + - Wed, 20 Oct 2021 19:04:59 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: @@ -77,7 +77,7 @@ interactions: content-type: - application/json date: - - Mon, 18 Oct 2021 22:49:08 GMT + - Wed, 20 Oct 2021 19:05:00 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: @@ -119,7 +119,7 @@ interactions: content-type: - application/json date: - - Mon, 18 Oct 2021 22:49:09 GMT + - Wed, 20 Oct 2021 19:05:00 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml index 909d9e7a8a1d..6ecb68a231d0 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -27,7 +27,7 @@ interactions: content-type: - application/json date: - - Mon, 18 Oct 2021 22:49:11 GMT + - Wed, 20 Oct 2021 19:05:01 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: @@ -77,7 +77,7 @@ interactions: content-type: - application/json date: - - Mon, 18 Oct 2021 22:49:11 GMT + - Wed, 20 Oct 2021 19:05:02 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: @@ -119,7 +119,7 @@ interactions: content-type: - application/json date: - - Mon, 18 Oct 2021 22:49:11 GMT + - Wed, 20 Oct 2021 19:05:02 GMT location: - https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml index e4ee468df71b..068d169df789 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_with_auto_register_schemas.yaml @@ -21,7 +21,7 @@ interactions: string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: application/json - date: Mon, 18 Oct 2021 22:49:12 GMT + date: Wed, 20 Oct 2021 19:05:03 GMT location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: a9619ab12fb748ab9ec800c13850107e schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview @@ -57,7 +57,7 @@ interactions: string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: application/json - date: Mon, 18 Oct 2021 22:49:12 GMT + date: Wed, 20 Oct 2021 19:05:03 GMT location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: a9619ab12fb748ab9ec800c13850107e schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview @@ -85,7 +85,7 @@ interactions: string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' headers: content-type: application/json - date: Mon, 18 Oct 2021 22:49:13 GMT + date: Wed, 20 Oct 2021 19:05:04 GMT location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: a9619ab12fb748ab9ec800c13850107e schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml index cf6d0ab32901..a1c8635eef54 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/recordings/test_avro_serializer_async.test_basic_sr_avro_serializer_without_auto_register_schemas.yaml @@ -21,7 +21,7 @@ interactions: string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: application/json - date: Mon, 18 Oct 2021 22:49:14 GMT + date: Wed, 20 Oct 2021 19:05:05 GMT location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: a9619ab12fb748ab9ec800c13850107e schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview @@ -57,7 +57,7 @@ interactions: string: '{"id":"a9619ab12fb748ab9ec800c13850107e"}' headers: content-type: application/json - date: Mon, 18 Oct 2021 22:49:15 GMT + date: Wed, 20 Oct 2021 19:05:06 GMT location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: a9619ab12fb748ab9ec800c13850107e schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview @@ -85,7 +85,7 @@ interactions: string: '{"type":"record","name":"User","namespace":"example.avro","fields":[{"type":"string","name":"name"},{"type":["int","null"],"name":"favorite_number"},{"type":["string","null"],"name":"favorite_color"}]}' headers: content-type: application/json - date: Mon, 18 Oct 2021 22:49:15 GMT + date: Wed, 20 Oct 2021 19:05:06 GMT location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/fakegroup/schemas/example.avro.User/versions/1?api-version=2020-09-01-preview schema-id: a9619ab12fb748ab9ec800c13850107e schema-id-location: https://swathip-test-eventhubs.servicebus.windows.net:443/$schemagroups/getschemabyid/a9619ab12fb748ab9ec800c13850107e?api-version=2020-09-01-preview From 2b8f770c222aa6cb15736c8002ff6624e37d5d2f Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Wed, 20 Oct 2021 14:06:11 -0700 Subject: [PATCH 09/12] conftest --- .../azure-schemaregistry-avroserializer/tests/conftest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py index 3cf8a1feed70..dc548de50415 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/conftest.py @@ -11,6 +11,6 @@ import pytest # Ignore async tests for Python < 3.5 -collect_ignore = [] +collect_ignore_glob = [] if sys.version_info < (3, 5): - collect_ignore.append("*_async.py") + collect_ignore_glob.append("*_async.py") From 75d36eff8bb07d848750f2eab8186a459126dbb9 Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Wed, 20 Oct 2021 14:21:19 -0700 Subject: [PATCH 10/12] add license in test file --- .../tests/test_avro_serializer.py | 6 ++++++ .../tests/test_avro_serializer_async.py | 6 ++++++ 2 files changed, 12 insertions(+) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py index b925800685a7..03cda0b26ad0 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer.py @@ -1,3 +1,9 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the ""Software""), to # deal in the Software without restriction, including without limitation the diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py index 9d79e5f94395..b06e1363b46c 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/tests/test_avro_serializer_async.py @@ -1,3 +1,9 @@ +# -------------------------------------------------------------------------- +# +# Copyright (c) Microsoft Corporation. All rights reserved. +# +# The MIT License (MIT) +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the ""Software""), to # deal in the Software without restriction, including without limitation the From 20d997c49e7c9efa6c879e8a16db96404fe81533 Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Wed, 20 Oct 2021 14:48:01 -0700 Subject: [PATCH 11/12] lint --- .../avroserializer/_schema_registry_avro_serializer.py | 1 - .../aio/_schema_registry_avro_serializer_async.py | 2 -- 2 files changed, 3 deletions(-) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py index c923adad8eab..387c9007a299 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/_schema_registry_avro_serializer.py @@ -29,7 +29,6 @@ from backports.functools_lru_cache import lru_cache from io import BytesIO from typing import Any, Dict, Mapping -import avro from ._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX from ._avro_serializer import AvroObjectSerializer diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py index c1c3c17fbacf..e0e9f28e43f7 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/azure/schemaregistry/serializer/avroserializer/aio/_schema_registry_avro_serializer_async.py @@ -25,8 +25,6 @@ # -------------------------------------------------------------------------- from io import BytesIO from typing import Any, Dict, Mapping -import avro - from ._async_lru import alru_cache from .._constants import SCHEMA_ID_START_INDEX, SCHEMA_ID_LENGTH, DATA_START_INDEX from .._avro_serializer import AvroObjectSerializer From b242b554e0569bcc5c8ad96d24965ac638dfe1f6 Mon Sep 17 00:00:00 2001 From: Swathi Pillalamarri Date: Thu, 21 Oct 2021 12:05:00 -0700 Subject: [PATCH 12/12] samples --- .../samples/README.md | 9 ++++++--- ...egration.py => eventhub_receive_integration_async.py} | 0 ...integration.py => eventhub_send_integration_async.py} | 0 3 files changed, 6 insertions(+), 3 deletions(-) rename sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/{eventhub_receive_integration.py => eventhub_receive_integration_async.py} (100%) rename sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/{eventhub_send_integration.py => eventhub_send_integration_async.py} (100%) diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md index 817d74dfb2ff..a7cdd44591c0 100644 --- a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md +++ b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/README.md @@ -14,12 +14,12 @@ These are code samples that show common scenario operations with the Schema Regi Several Schema Registry Avro Serializer Python SDK samples are available to you in the SDK's GitHub repository. These samples provide example code for additional scenarios commonly encountered while working with Schema Registry Avro Serializer: -* [avro_serializer.py][avro_serializer_sample] - Examples for common Schema Registry Avro Serializer tasks: +* [avro_serializer.py][avro_serializer_sample] ([async version][avro_serializer_async_sample]) - Examples for common Schema Registry Avro Serializer tasks: * Serialize data according to the given schema * Deserialize data -* [eventhub_send_integration.py][eventhub_send_integration_sample] - Examples for integration with EventHub in sending tasks: +* [eventhub_send_integration.py][eventhub_send_integration_sample] ([async version][eventhub_send_integration_async_sample]) - Examples for integration with EventHub in sending tasks: * Serialize data with the given schema and send `EventData` to Event Hubs. -* [eventhub_receive_integration.py][eventhub_receive_integration_sample] - Examples for integration with EventHub in receiving tasks: +* [eventhub_receive_integration.py][eventhub_receive_integration_sample] ([async version][eventhub_receive_integration_async_sample]) - Examples for integration with EventHub in receiving tasks: * Receive `EventData` from Event Hubs and deserialize the received bytes. ## Prerequisites @@ -53,4 +53,7 @@ what you can do with the Azure Schema Registry Avro Serializer library. [avro_serializer_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/avro_serializer.py [eventhub_send_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_send_integration.py [eventhub_receive_integration_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/sync_samples/eventhub_receive_integration.py +[avro_serializer_async_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/avro_serializer_async.py +[eventhub_send_integration_async_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration_async.py +[eventhub_receive_integration_async_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration_async.py [api_reference]: https://azuresdkdocs.blob.core.windows.net/$web/python/azure-schemaregistry-avroserializer/latest/index.html diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration_async.py similarity index 100% rename from sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration.py rename to sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_receive_integration_async.py diff --git a/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py b/sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration_async.py similarity index 100% rename from sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration.py rename to sdk/schemaregistry/azure-schemaregistry-avroserializer/samples/async_samples/eventhub_send_integration_async.py