From b8f471f211d55cdac3dc559d1312dc27a1ccdf33 Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Tue, 11 Apr 2023 00:35:58 -0700 Subject: [PATCH 01/11] Batch 1 --- .../storage/queue/aio/_queue_client_async.py | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index e5f87365399f..2292a2a5cb35 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -101,7 +101,7 @@ def __init__( self._configure_encryption(kwargs) @distributed_trace_async - async def create_queue( + async def create_queue( # type: ignore self, *, metadata: Optional[Dict[str, str]] = None, **kwargs: Any @@ -145,7 +145,7 @@ async def create_queue( process_storage_error(error) @distributed_trace_async - async def delete_queue(self, **kwargs: Any) -> None: + async def delete_queue(self, **kwargs: Any) -> None: # type: ignore """Deletes the specified queue and any messages it contains. When a queue is successfully deleted, it is immediately marked for deletion @@ -180,7 +180,7 @@ async def delete_queue(self, **kwargs: Any) -> None: process_storage_error(error) @distributed_trace_async - async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": + async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": # type: ignore """Returns all user-defined metadata for the specified queue. The data returned does not include the queue's list of messages. @@ -210,7 +210,7 @@ async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": return response @distributed_trace_async - async def set_queue_metadata( + async def set_queue_metadata( # type: ignore self, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> None: @@ -249,7 +249,7 @@ async def set_queue_metadata( process_storage_error(error) @distributed_trace_async - async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy]: + async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy]: # type: ignore """Returns details about any stored access policies specified on the queue that may be used with Shared Access Signatures. @@ -272,7 +272,7 @@ async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy return {s.id: s.access_policy or AccessPolicy() for s in identifiers} @distributed_trace_async - async def set_queue_access_policy( + async def set_queue_access_policy( # type: ignore self, signed_identifiers: Dict[str, AccessPolicy], **kwargs: Any ) -> None: @@ -329,7 +329,7 @@ async def set_queue_access_policy( process_storage_error(error) @distributed_trace_async - async def send_message( + async def send_message( # type: ignore self, content: Any, *, visibility_timeout: Optional[int] = None, @@ -425,7 +425,7 @@ async def send_message( process_storage_error(error) @distributed_trace_async - async def receive_message( + async def receive_message( # type: ignore self, *, visibility_timeout: Optional[int] = None, **kwargs: Any @@ -487,7 +487,7 @@ async def receive_message( process_storage_error(error) @distributed_trace - def receive_messages( + def receive_messages( # type: ignore self, *, messages_per_page: Optional[int] = None, visibility_timeout: Optional[int] = None, @@ -565,7 +565,7 @@ def receive_messages( process_storage_error(error) @distributed_trace_async - async def update_message( + async def update_message( # type: ignore self, message: Union[str, QueueMessage], pop_receipt: Optional[str] = None, content: Optional[Any] = None, @@ -624,14 +624,16 @@ async def update_message( :caption: Update a message. """ timeout = kwargs.pop('timeout', None) - try: + + receipt: Optional[str] + if isinstance(message, QueueMessage): message_id = message.id message_text = content or message.content receipt = pop_receipt or message.pop_receipt inserted_on = message.inserted_on expires_on = message.expires_on dequeue_count = message.dequeue_count - except AttributeError: + else: message_id = message message_text = content receipt = pop_receipt @@ -687,7 +689,7 @@ async def update_message( process_storage_error(error) @distributed_trace_async - async def peek_messages( + async def peek_messages( # type: ignore self, max_messages: Optional[int] = None, **kwargs: Any ) -> List[QueueMessage]: @@ -750,7 +752,7 @@ async def peek_messages( process_storage_error(error) @distributed_trace_async - async def clear_messages(self, **kwargs: Any) -> None: + async def clear_messages(self, **kwargs: Any) -> None: # type: ignore """Deletes all messages from the specified queue. :keyword int timeout: @@ -776,7 +778,7 @@ async def clear_messages(self, **kwargs: Any) -> None: process_storage_error(error) @distributed_trace_async - async def delete_message( + async def delete_message( # type: ignore self, message: Union[str, QueueMessage], pop_receipt: Optional[str] = None, **kwargs: Any @@ -816,10 +818,12 @@ async def delete_message( :caption: Delete a message. """ timeout = kwargs.pop('timeout', None) - try: + + receipt: Optional[str] + if isinstance(message, QueueMessage): message_id = message.id receipt = pop_receipt or message.pop_receipt - except AttributeError: + else: message_id = message receipt = pop_receipt From bf948eca6f61bfef6aef96a950bab607eefc6628 Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Tue, 11 Apr 2023 16:07:39 -0700 Subject: [PATCH 02/11] Ignores batch 2 --- .../storage/queue/aio/_queue_client_async.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index 2292a2a5cb35..f1475176eb79 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -138,7 +138,7 @@ async def create_queue( # type: ignore headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) try: - return await self._client.queue.create( + return await self._client.queue.create( # type: ignore metadata=metadata, timeout=timeout, headers=headers, cls=deserialize_queue_creation, **kwargs ) except HttpResponseError as error: @@ -175,7 +175,7 @@ async def delete_queue(self, **kwargs: Any) -> None: # type: ignore """ timeout = kwargs.pop('timeout', None) try: - await self._client.queue.delete(timeout=timeout, **kwargs) + await self._client.queue.delete(timeout=timeout, **kwargs) # type: ignore except HttpResponseError as error: process_storage_error(error) @@ -201,7 +201,7 @@ async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": # typ """ timeout = kwargs.pop('timeout', None) try: - response = await self._client.queue.get_properties( + response = await self._client.queue.get_properties( # type: ignore timeout=timeout, cls=deserialize_queue_properties, **kwargs ) except HttpResponseError as error: @@ -242,7 +242,7 @@ async def set_queue_metadata( # type: ignore headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) try: - return await self._client.queue.set_metadata( + return await self._client.queue.set_metadata( # type: ignore timeout=timeout, headers=headers, cls=return_response_headers, **kwargs ) except HttpResponseError as error: @@ -264,7 +264,7 @@ async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy """ timeout = kwargs.pop('timeout', None) try: - _, identifiers = await self._client.queue.get_access_policy( + _, identifiers = await self._client.queue.get_access_policy( # type: ignore timeout=timeout, cls=return_headers_and_deserialized, **kwargs ) except HttpResponseError as error: @@ -324,7 +324,7 @@ async def set_queue_access_policy( # type: ignore value.expiry = serialize_iso(value.expiry) identifiers.append(SignedIdentifier(id=key, access_policy=value)) try: - await self._client.queue.set_access_policy(queue_acl=identifiers or None, timeout=timeout, **kwargs) + await self._client.queue.set_access_policy(queue_acl=identifiers or None, timeout=timeout, **kwargs) # type: ignore except HttpResponseError as error: process_storage_error(error) @@ -407,7 +407,7 @@ async def send_message( # type: ignore new_message = GenQueueMessage(message_text=encoded_content) try: - enqueued = await self._client.messages.enqueue( + enqueued = await self._client.messages.enqueue( # type: ignore queue_message=new_message, visibilitytimeout=visibility_timeout, message_time_to_live=time_to_live, @@ -473,7 +473,7 @@ async def receive_message( # type: ignore key_encryption_key=self.key_encryption_key, resolver=self.key_resolver_function) try: - message = await self._client.messages.dequeue( + message = await self._client.messages.dequeue( # type: ignore number_of_messages=1, visibilitytimeout=visibility_timeout, timeout=timeout, @@ -668,7 +668,7 @@ async def update_message( # type: ignore else: updated = None try: - response = await self._client.message_id.update( + response = await self._client.message_id.update( # type: ignore queue_message=updated, visibilitytimeout=visibility_timeout or 0, timeout=timeout, @@ -741,7 +741,7 @@ async def peek_messages( # type: ignore resolver=self.key_resolver_function ) try: - messages = await self._client.messages.peek( + messages = await self._client.messages.peek( # type: ignore number_of_messages=max_messages, timeout=timeout, cls=self.message_decode_policy, **kwargs ) wrapped_messages = [] @@ -773,7 +773,7 @@ async def clear_messages(self, **kwargs: Any) -> None: # type: ignore """ timeout = kwargs.pop('timeout', None) try: - await self._client.messages.clear(timeout=timeout, **kwargs) + await self._client.messages.clear(timeout=timeout, **kwargs) # type: ignore except HttpResponseError as error: process_storage_error(error) @@ -830,7 +830,7 @@ async def delete_message( # type: ignore if receipt is None: raise ValueError("pop_receipt must be present") try: - await self._client.message_id.delete( + await self._client.message_id.delete( # type: ignore pop_receipt=receipt, timeout=timeout, queue_message_id=message_id, **kwargs ) except HttpResponseError as error: From b0fa67bca14ac0720d8fceb30d6cde9930da2a59 Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Tue, 11 Apr 2023 17:19:02 -0700 Subject: [PATCH 03/11] Ignore the world --- .../azure/storage/queue/aio/_queue_client_async.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index f1475176eb79..9bd8ff9acaac 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -37,7 +37,7 @@ from .._models import QueueProperties -class QueueClient(AsyncStorageAccountHostsMixin, QueueClientBase, StorageEncryptionMixin): +class QueueClient(AsyncStorageAccountHostsMixin, QueueClientBase, StorageEncryptionMixin): # type: ignore """A client to interact with a specific Queue. :param str account_url: @@ -94,7 +94,7 @@ def __init__( super(QueueClient, self).__init__( account_url, queue_name=queue_name, credential=credential, loop=loop, **kwargs ) - self._client = AzureQueueStorage(self.url, base_url=self.url, + self._client = AzureQueueStorage(self.url, base_url=self.url, # type: ignore pipeline=self._pipeline, loop=loop) self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop From ff090e9f81d39cded6e5ae175b94d8780b66e641 Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Thu, 13 Apr 2023 17:06:21 -0700 Subject: [PATCH 04/11] Second batch of feedback --- .../storage/queue/aio/_queue_client_async.py | 55 +++++++++---------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index 9bd8ff9acaac..4966c1788210 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -37,7 +37,7 @@ from .._models import QueueProperties -class QueueClient(AsyncStorageAccountHostsMixin, QueueClientBase, StorageEncryptionMixin): # type: ignore +class QueueClient(AsyncStorageAccountHostsMixin, QueueClientBase, StorageEncryptionMixin): # type: ignore[misc] """A client to interact with a specific Queue. :param str account_url: @@ -94,14 +94,13 @@ def __init__( super(QueueClient, self).__init__( account_url, queue_name=queue_name, credential=credential, loop=loop, **kwargs ) - self._client = AzureQueueStorage(self.url, base_url=self.url, # type: ignore - pipeline=self._pipeline, loop=loop) + self._client = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) # type: ignore self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop self._configure_encryption(kwargs) @distributed_trace_async - async def create_queue( # type: ignore + async def create_queue( # type: ignore[override] self, *, metadata: Optional[Dict[str, str]] = None, **kwargs: Any @@ -138,14 +137,14 @@ async def create_queue( # type: ignore headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) try: - return await self._client.queue.create( # type: ignore + return self._client.queue.create( metadata=metadata, timeout=timeout, headers=headers, cls=deserialize_queue_creation, **kwargs ) except HttpResponseError as error: process_storage_error(error) @distributed_trace_async - async def delete_queue(self, **kwargs: Any) -> None: # type: ignore + async def delete_queue(self, **kwargs: Any) -> None: # type: ignore[override] """Deletes the specified queue and any messages it contains. When a queue is successfully deleted, it is immediately marked for deletion @@ -175,12 +174,12 @@ async def delete_queue(self, **kwargs: Any) -> None: # type: ignore """ timeout = kwargs.pop('timeout', None) try: - await self._client.queue.delete(timeout=timeout, **kwargs) # type: ignore + return self._client.queue.delete(timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @distributed_trace_async - async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": # type: ignore + async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": # type: ignore[override] """Returns all user-defined metadata for the specified queue. The data returned does not include the queue's list of messages. @@ -201,7 +200,7 @@ async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": # typ """ timeout = kwargs.pop('timeout', None) try: - response = await self._client.queue.get_properties( # type: ignore + response = await self._client.queue.get_properties( timeout=timeout, cls=deserialize_queue_properties, **kwargs ) except HttpResponseError as error: @@ -210,7 +209,7 @@ async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": # typ return response @distributed_trace_async - async def set_queue_metadata( # type: ignore + async def set_queue_metadata( # type: ignore[override] self, metadata: Optional[Dict[str, str]] = None, **kwargs: Any ) -> None: @@ -242,14 +241,14 @@ async def set_queue_metadata( # type: ignore headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) try: - return await self._client.queue.set_metadata( # type: ignore + return self._client.queue.set_metadata( timeout=timeout, headers=headers, cls=return_response_headers, **kwargs ) except HttpResponseError as error: process_storage_error(error) @distributed_trace_async - async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy]: # type: ignore + async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy]: # type: ignore[override] """Returns details about any stored access policies specified on the queue that may be used with Shared Access Signatures. @@ -264,7 +263,7 @@ async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy """ timeout = kwargs.pop('timeout', None) try: - _, identifiers = await self._client.queue.get_access_policy( # type: ignore + _, identifiers = await self._client.queue.get_access_policy( timeout=timeout, cls=return_headers_and_deserialized, **kwargs ) except HttpResponseError as error: @@ -272,7 +271,7 @@ async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy return {s.id: s.access_policy or AccessPolicy() for s in identifiers} @distributed_trace_async - async def set_queue_access_policy( # type: ignore + async def set_queue_access_policy( # type: ignore[override] self, signed_identifiers: Dict[str, AccessPolicy], **kwargs: Any ) -> None: @@ -324,12 +323,12 @@ async def set_queue_access_policy( # type: ignore value.expiry = serialize_iso(value.expiry) identifiers.append(SignedIdentifier(id=key, access_policy=value)) try: - await self._client.queue.set_access_policy(queue_acl=identifiers or None, timeout=timeout, **kwargs) # type: ignore + return self._client.queue.set_access_policy(queue_acl=identifiers or None, timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @distributed_trace_async - async def send_message( # type: ignore + async def send_message( # type: ignore[override] self, content: Any, *, visibility_timeout: Optional[int] = None, @@ -407,7 +406,7 @@ async def send_message( # type: ignore new_message = GenQueueMessage(message_text=encoded_content) try: - enqueued = await self._client.messages.enqueue( # type: ignore + enqueued = await self._client.messages.enqueue( queue_message=new_message, visibilitytimeout=visibility_timeout, message_time_to_live=time_to_live, @@ -425,7 +424,7 @@ async def send_message( # type: ignore process_storage_error(error) @distributed_trace_async - async def receive_message( # type: ignore + async def receive_message( # type: ignore[override] self, *, visibility_timeout: Optional[int] = None, **kwargs: Any @@ -473,7 +472,7 @@ async def receive_message( # type: ignore key_encryption_key=self.key_encryption_key, resolver=self.key_resolver_function) try: - message = await self._client.messages.dequeue( # type: ignore + message = await self._client.messages.dequeue( number_of_messages=1, visibilitytimeout=visibility_timeout, timeout=timeout, @@ -487,7 +486,7 @@ async def receive_message( # type: ignore process_storage_error(error) @distributed_trace - def receive_messages( # type: ignore + def receive_messages( # type: ignore[override] self, *, messages_per_page: Optional[int] = None, visibility_timeout: Optional[int] = None, @@ -565,7 +564,7 @@ def receive_messages( # type: ignore process_storage_error(error) @distributed_trace_async - async def update_message( # type: ignore + async def update_message( # type: ignore[override] self, message: Union[str, QueueMessage], pop_receipt: Optional[str] = None, content: Optional[Any] = None, @@ -668,7 +667,7 @@ async def update_message( # type: ignore else: updated = None try: - response = await self._client.message_id.update( # type: ignore + response = await self._client.message_id.update( queue_message=updated, visibilitytimeout=visibility_timeout or 0, timeout=timeout, @@ -689,7 +688,7 @@ async def update_message( # type: ignore process_storage_error(error) @distributed_trace_async - async def peek_messages( # type: ignore + async def peek_messages( # type: ignore[override] self, max_messages: Optional[int] = None, **kwargs: Any ) -> List[QueueMessage]: @@ -741,7 +740,7 @@ async def peek_messages( # type: ignore resolver=self.key_resolver_function ) try: - messages = await self._client.messages.peek( # type: ignore + messages = await self._client.messages.peek( number_of_messages=max_messages, timeout=timeout, cls=self.message_decode_policy, **kwargs ) wrapped_messages = [] @@ -752,7 +751,7 @@ async def peek_messages( # type: ignore process_storage_error(error) @distributed_trace_async - async def clear_messages(self, **kwargs: Any) -> None: # type: ignore + async def clear_messages(self, **kwargs: Any) -> None: # type: ignore[override] """Deletes all messages from the specified queue. :keyword int timeout: @@ -773,12 +772,12 @@ async def clear_messages(self, **kwargs: Any) -> None: # type: ignore """ timeout = kwargs.pop('timeout', None) try: - await self._client.messages.clear(timeout=timeout, **kwargs) # type: ignore + return self._client.messages.clear(timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @distributed_trace_async - async def delete_message( # type: ignore + async def delete_message( # type: ignore[override] self, message: Union[str, QueueMessage], pop_receipt: Optional[str] = None, **kwargs: Any @@ -830,7 +829,7 @@ async def delete_message( # type: ignore if receipt is None: raise ValueError("pop_receipt must be present") try: - await self._client.message_id.delete( # type: ignore + return self._client.message_id.delete( pop_receipt=receipt, timeout=timeout, queue_message_id=message_id, **kwargs ) except HttpResponseError as error: From 6f2626e57cbc77d6da048f304404cf8f82a3f7de Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Fri, 14 Apr 2023 15:06:58 -0700 Subject: [PATCH 05/11] Change client's name for async, less ignores! --- .../storage/queue/aio/_queue_client_async.py | 38 +++++++++---------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index 4966c1788210..7d938d554ae3 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -8,7 +8,7 @@ import functools import warnings from typing import ( - Any, Dict, List, Optional, Union, + Any, cast, Dict, List, Optional, Union, Tuple, TYPE_CHECKING) from azure.core.async_paging import AsyncItemPaged @@ -94,8 +94,8 @@ def __init__( super(QueueClient, self).__init__( account_url, queue_name=queue_name, credential=credential, loop=loop, **kwargs ) - self._client = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) # type: ignore - self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access + self._client_async = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) # type: ignore + self._client_async._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop self._configure_encryption(kwargs) @@ -137,7 +137,7 @@ async def create_queue( # type: ignore[override] headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) try: - return self._client.queue.create( + return await self._client_async.queue.create( metadata=metadata, timeout=timeout, headers=headers, cls=deserialize_queue_creation, **kwargs ) except HttpResponseError as error: @@ -174,7 +174,7 @@ async def delete_queue(self, **kwargs: Any) -> None: # type: ignore[override] """ timeout = kwargs.pop('timeout', None) try: - return self._client.queue.delete(timeout=timeout, **kwargs) + await self._client_async.queue.delete(timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @@ -200,9 +200,9 @@ async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": # typ """ timeout = kwargs.pop('timeout', None) try: - response = await self._client.queue.get_properties( + response = cast("QueueProperties", await (self._client_async.queue.get_properties( timeout=timeout, cls=deserialize_queue_properties, **kwargs - ) + ))) except HttpResponseError as error: process_storage_error(error) response.name = self.queue_name @@ -241,7 +241,7 @@ async def set_queue_metadata( # type: ignore[override] headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) try: - return self._client.queue.set_metadata( + await self._client_async.queue.set_metadata( timeout=timeout, headers=headers, cls=return_response_headers, **kwargs ) except HttpResponseError as error: @@ -263,9 +263,9 @@ async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy """ timeout = kwargs.pop('timeout', None) try: - _, identifiers = await self._client.queue.get_access_policy( + _, identifiers = cast(Tuple[Dict, List], await self._client_async.queue.get_access_policy( timeout=timeout, cls=return_headers_and_deserialized, **kwargs - ) + )) except HttpResponseError as error: process_storage_error(error) return {s.id: s.access_policy or AccessPolicy() for s in identifiers} @@ -323,7 +323,7 @@ async def set_queue_access_policy( # type: ignore[override] value.expiry = serialize_iso(value.expiry) identifiers.append(SignedIdentifier(id=key, access_policy=value)) try: - return self._client.queue.set_access_policy(queue_acl=identifiers or None, timeout=timeout, **kwargs) + await self._client_async.queue.set_access_policy(queue_acl=identifiers or None, timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @@ -406,7 +406,7 @@ async def send_message( # type: ignore[override] new_message = GenQueueMessage(message_text=encoded_content) try: - enqueued = await self._client.messages.enqueue( + enqueued = await self._client_async.messages.enqueue( queue_message=new_message, visibilitytimeout=visibility_timeout, message_time_to_live=time_to_live, @@ -472,7 +472,7 @@ async def receive_message( # type: ignore[override] key_encryption_key=self.key_encryption_key, resolver=self.key_resolver_function) try: - message = await self._client.messages.dequeue( + message = await self._client_async.messages.dequeue( number_of_messages=1, visibilitytimeout=visibility_timeout, timeout=timeout, @@ -549,7 +549,7 @@ def receive_messages( # type: ignore[override] ) try: command = functools.partial( - self._client.messages.dequeue, + self._client_async.messages.dequeue, visibilitytimeout=visibility_timeout, timeout=timeout, cls=self.message_decode_policy, @@ -667,7 +667,7 @@ async def update_message( # type: ignore[override] else: updated = None try: - response = await self._client.message_id.update( + response = cast(QueueMessage, await self._client_async.message_id.update( queue_message=updated, visibilitytimeout=visibility_timeout or 0, timeout=timeout, @@ -675,7 +675,7 @@ async def update_message( # type: ignore[override] cls=return_response_headers, queue_message_id=message_id, **kwargs - ) + )) new_message = QueueMessage(content=message_text) new_message.id = message_id new_message.inserted_on = inserted_on @@ -740,7 +740,7 @@ async def peek_messages( # type: ignore[override] resolver=self.key_resolver_function ) try: - messages = await self._client.messages.peek( + messages = await self._client_async.messages.peek( number_of_messages=max_messages, timeout=timeout, cls=self.message_decode_policy, **kwargs ) wrapped_messages = [] @@ -772,7 +772,7 @@ async def clear_messages(self, **kwargs: Any) -> None: # type: ignore[override] """ timeout = kwargs.pop('timeout', None) try: - return self._client.messages.clear(timeout=timeout, **kwargs) + await self._client_async.messages.clear(timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @@ -829,7 +829,7 @@ async def delete_message( # type: ignore[override] if receipt is None: raise ValueError("pop_receipt must be present") try: - return self._client.message_id.delete( + await self._client_async.message_id.delete( pop_receipt=receipt, timeout=timeout, queue_message_id=message_id, **kwargs ) except HttpResponseError as error: From f7b17524645131bf27f7bfc727ea3f67020dcee5 Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Fri, 28 Apr 2023 13:11:39 -0700 Subject: [PATCH 06/11] Remove extraneous ignore --- .../azure/storage/queue/aio/_queue_client_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index 7d938d554ae3..f32cffa40904 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -94,7 +94,7 @@ def __init__( super(QueueClient, self).__init__( account_url, queue_name=queue_name, credential=credential, loop=loop, **kwargs ) - self._client_async = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) # type: ignore + self._client_async = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) self._client_async._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop self._configure_encryption(kwargs) From 60141bee0df7a0966044bcc31cddca2ee83da239 Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Mon, 1 May 2023 12:17:37 -0700 Subject: [PATCH 07/11] Change to self._client_async, change Pipeline to AsyncPipeline --- .../azure/storage/queue/_shared/base_client_async.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py index f6dc3a9c747c..bb26372dbc50 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py @@ -39,7 +39,6 @@ from .response_handlers import process_storage_error, PartialBatchErrorException if TYPE_CHECKING: - from azure.core.pipeline import Pipeline from azure.core.pipeline.transport import HttpRequest from azure.core.configuration import Configuration _LOGGER = logging.getLogger(__name__) @@ -54,20 +53,20 @@ def __exit__(self, *args): pass async def __aenter__(self): - await self._client.__aenter__() + await self._client_async.__aenter__() return self async def __aexit__(self, *args): - await self._client.__aexit__(*args) + await self._client_async.__aexit__(*args) async def close(self): """ This method is to close the sockets opened by the client. It need not be used when using with a context manager. """ - await self._client.close() + await self._client_async.close() def _create_pipeline(self, credential, **kwargs): - # type: (Any, **Any) -> Tuple[Configuration, Pipeline] + # type: (Any, **Any) -> Tuple[Configuration, AsyncPipeline] self._credential_policy = None if hasattr(credential, 'get_token'): self._credential_policy = AsyncBearerTokenCredentialPolicy(credential, STORAGE_OAUTH_SCOPE) @@ -119,7 +118,7 @@ async def _batch_send( """ # Pop it here, so requests doesn't feel bad about additional kwarg raise_on_any_failure = kwargs.pop("raise_on_any_failure", True) - request = self._client._client.post( # pylint: disable=protected-access + request = self._client_async._client.post( # pylint: disable=protected-access url=( f'{self.scheme}://{self.primary_hostname}/' f"{kwargs.pop('path', '')}?{kwargs.pop('restype', '')}" From 2513d2bf6b6c345881efa96f28dfab4cb8213cc9 Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Mon, 1 May 2023 16:05:59 -0700 Subject: [PATCH 08/11] Add this client_async pattern to QueueServiceClient --- .../storage/queue/aio/_queue_service_client_async.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py index 22bac2bdcf92..c559a2af6932 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py @@ -97,8 +97,8 @@ def __init__( credential=credential, loop=loop, **kwargs) - self._client = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) - self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access + self._client_async = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) + self._client_async._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop self._configure_encryption(kwargs) @@ -129,7 +129,7 @@ async def get_service_stats(self, **kwargs: Any) -> Dict[str, Any]: """ timeout = kwargs.pop('timeout', None) try: - stats = await self._client.service.get_statistics( + stats = await self._client_async.service.get_statistics( timeout=timeout, use_location=LocationMode.SECONDARY, **kwargs) return service_stats_deserialize(stats) except HttpResponseError as error: @@ -157,7 +157,7 @@ async def get_service_properties(self, **kwargs: Any) -> Dict[str, Any]: """ timeout = kwargs.pop('timeout', None) try: - service_props = await self._client.service.get_properties(timeout=timeout, **kwargs) + service_props = await self._client_async.service.get_properties(timeout=timeout, **kwargs) return service_properties_deserialize(service_props) except HttpResponseError as error: process_storage_error(error) @@ -213,7 +213,7 @@ async def set_service_properties( cors=cors ) try: - return await self._client.service.set_properties(props, timeout=timeout, **kwargs) + return await self._client_async.service.set_properties(props, timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @@ -260,7 +260,7 @@ def list_queues( timeout = kwargs.pop('timeout', None) include = ['metadata'] if include_metadata else None command = functools.partial( - self._client.service.list_queues_segment, + self._client_async.service.list_queues_segment, prefix=name_starts_with, include=include, timeout=timeout, From 50aaf92fb4b155536cc353acf104287537ff65eb Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Tue, 2 May 2023 15:03:59 -0700 Subject: [PATCH 09/11] Null out client from super call, add ignore --- .../azure/storage/queue/aio/_queue_client_async.py | 2 ++ .../azure/storage/queue/aio/_queue_service_client_async.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index f32cffa40904..5713fd50d53d 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -94,6 +94,8 @@ def __init__( super(QueueClient, self).__init__( account_url, queue_name=queue_name, credential=credential, loop=loop, **kwargs ) + # Null out sync client in favor of async client below + self._client = None # type: ignore[assignment] self._client_async = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) self._client_async._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py index c559a2af6932..d849d7e70d56 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py @@ -97,6 +97,8 @@ def __init__( credential=credential, loop=loop, **kwargs) + # Null out sync client in favor of async client below + self._client = None # type: ignore[assignment] self._client_async = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) self._client_async._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop From fea5d6ba180b4836f135fddf6c0d66870dc7e91c Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Tue, 2 May 2023 17:21:39 -0700 Subject: [PATCH 10/11] Revert async client change --- .../queue/_shared/base_client_async.py | 8 ++--- .../storage/queue/aio/_queue_client_async.py | 30 +++++++++---------- .../queue/aio/_queue_service_client_async.py | 12 ++++---- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py index bb26372dbc50..11fef6ea29b8 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/_shared/base_client_async.py @@ -53,17 +53,17 @@ def __exit__(self, *args): pass async def __aenter__(self): - await self._client_async.__aenter__() + await self._client.__aenter__() return self async def __aexit__(self, *args): - await self._client_async.__aexit__(*args) + await self._client.__aexit__(*args) async def close(self): """ This method is to close the sockets opened by the client. It need not be used when using with a context manager. """ - await self._client_async.close() + await self._client.close() def _create_pipeline(self, credential, **kwargs): # type: (Any, **Any) -> Tuple[Configuration, AsyncPipeline] @@ -118,7 +118,7 @@ async def _batch_send( """ # Pop it here, so requests doesn't feel bad about additional kwarg raise_on_any_failure = kwargs.pop("raise_on_any_failure", True) - request = self._client_async._client.post( # pylint: disable=protected-access + request = self._client._client.post( # pylint: disable=protected-access url=( f'{self.scheme}://{self.primary_hostname}/' f"{kwargs.pop('path', '')}?{kwargs.pop('restype', '')}" diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index 5713fd50d53d..4451dafe06b4 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -96,8 +96,8 @@ def __init__( ) # Null out sync client in favor of async client below self._client = None # type: ignore[assignment] - self._client_async = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) - self._client_async._config.version = get_api_version(kwargs) # pylint: disable=protected-access + self._client = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) + self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop self._configure_encryption(kwargs) @@ -139,7 +139,7 @@ async def create_queue( # type: ignore[override] headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) try: - return await self._client_async.queue.create( + return await self._client.queue.create( metadata=metadata, timeout=timeout, headers=headers, cls=deserialize_queue_creation, **kwargs ) except HttpResponseError as error: @@ -176,7 +176,7 @@ async def delete_queue(self, **kwargs: Any) -> None: # type: ignore[override] """ timeout = kwargs.pop('timeout', None) try: - await self._client_async.queue.delete(timeout=timeout, **kwargs) + await self._client.queue.delete(timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @@ -202,7 +202,7 @@ async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": # typ """ timeout = kwargs.pop('timeout', None) try: - response = cast("QueueProperties", await (self._client_async.queue.get_properties( + response = cast("QueueProperties", await (self._client.queue.get_properties( timeout=timeout, cls=deserialize_queue_properties, **kwargs ))) except HttpResponseError as error: @@ -243,7 +243,7 @@ async def set_queue_metadata( # type: ignore[override] headers = kwargs.pop("headers", {}) headers.update(add_metadata_headers(metadata)) try: - await self._client_async.queue.set_metadata( + await self._client.queue.set_metadata( timeout=timeout, headers=headers, cls=return_response_headers, **kwargs ) except HttpResponseError as error: @@ -265,7 +265,7 @@ async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy """ timeout = kwargs.pop('timeout', None) try: - _, identifiers = cast(Tuple[Dict, List], await self._client_async.queue.get_access_policy( + _, identifiers = cast(Tuple[Dict, List], await self._client.queue.get_access_policy( timeout=timeout, cls=return_headers_and_deserialized, **kwargs )) except HttpResponseError as error: @@ -325,7 +325,7 @@ async def set_queue_access_policy( # type: ignore[override] value.expiry = serialize_iso(value.expiry) identifiers.append(SignedIdentifier(id=key, access_policy=value)) try: - await self._client_async.queue.set_access_policy(queue_acl=identifiers or None, timeout=timeout, **kwargs) + await self._client.queue.set_access_policy(queue_acl=identifiers or None, timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @@ -408,7 +408,7 @@ async def send_message( # type: ignore[override] new_message = GenQueueMessage(message_text=encoded_content) try: - enqueued = await self._client_async.messages.enqueue( + enqueued = await self._client.messages.enqueue( queue_message=new_message, visibilitytimeout=visibility_timeout, message_time_to_live=time_to_live, @@ -474,7 +474,7 @@ async def receive_message( # type: ignore[override] key_encryption_key=self.key_encryption_key, resolver=self.key_resolver_function) try: - message = await self._client_async.messages.dequeue( + message = await self._client.messages.dequeue( number_of_messages=1, visibilitytimeout=visibility_timeout, timeout=timeout, @@ -551,7 +551,7 @@ def receive_messages( # type: ignore[override] ) try: command = functools.partial( - self._client_async.messages.dequeue, + self._client.messages.dequeue, visibilitytimeout=visibility_timeout, timeout=timeout, cls=self.message_decode_policy, @@ -669,7 +669,7 @@ async def update_message( # type: ignore[override] else: updated = None try: - response = cast(QueueMessage, await self._client_async.message_id.update( + response = cast(QueueMessage, await self._client.message_id.update( queue_message=updated, visibilitytimeout=visibility_timeout or 0, timeout=timeout, @@ -742,7 +742,7 @@ async def peek_messages( # type: ignore[override] resolver=self.key_resolver_function ) try: - messages = await self._client_async.messages.peek( + messages = await self._client.messages.peek( number_of_messages=max_messages, timeout=timeout, cls=self.message_decode_policy, **kwargs ) wrapped_messages = [] @@ -774,7 +774,7 @@ async def clear_messages(self, **kwargs: Any) -> None: # type: ignore[override] """ timeout = kwargs.pop('timeout', None) try: - await self._client_async.messages.clear(timeout=timeout, **kwargs) + await self._client.messages.clear(timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @@ -831,7 +831,7 @@ async def delete_message( # type: ignore[override] if receipt is None: raise ValueError("pop_receipt must be present") try: - await self._client_async.message_id.delete( + await self._client.message_id.delete( pop_receipt=receipt, timeout=timeout, queue_message_id=message_id, **kwargs ) except HttpResponseError as error: diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py index d849d7e70d56..1cf3aa48c76d 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py @@ -99,8 +99,8 @@ def __init__( **kwargs) # Null out sync client in favor of async client below self._client = None # type: ignore[assignment] - self._client_async = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) - self._client_async._config.version = get_api_version(kwargs) # pylint: disable=protected-access + self._client = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) + self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop self._configure_encryption(kwargs) @@ -131,7 +131,7 @@ async def get_service_stats(self, **kwargs: Any) -> Dict[str, Any]: """ timeout = kwargs.pop('timeout', None) try: - stats = await self._client_async.service.get_statistics( + stats = await self._client.service.get_statistics( timeout=timeout, use_location=LocationMode.SECONDARY, **kwargs) return service_stats_deserialize(stats) except HttpResponseError as error: @@ -159,7 +159,7 @@ async def get_service_properties(self, **kwargs: Any) -> Dict[str, Any]: """ timeout = kwargs.pop('timeout', None) try: - service_props = await self._client_async.service.get_properties(timeout=timeout, **kwargs) + service_props = await self._client.service.get_properties(timeout=timeout, **kwargs) return service_properties_deserialize(service_props) except HttpResponseError as error: process_storage_error(error) @@ -215,7 +215,7 @@ async def set_service_properties( cors=cors ) try: - return await self._client_async.service.set_properties(props, timeout=timeout, **kwargs) + return await self._client.service.set_properties(props, timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @@ -262,7 +262,7 @@ def list_queues( timeout = kwargs.pop('timeout', None) include = ['metadata'] if include_metadata else None command = functools.partial( - self._client_async.service.list_queues_segment, + self._client.service.list_queues_segment, prefix=name_starts_with, include=include, timeout=timeout, From 0aa30a1b1f20cd6849c96e40ba5f687c169de783 Mon Sep 17 00:00:00 2001 From: Vincent Tran Date: Tue, 2 May 2023 17:24:41 -0700 Subject: [PATCH 11/11] Remove null-ing client --- .../azure/storage/queue/aio/_queue_client_async.py | 2 -- .../azure/storage/queue/aio/_queue_service_client_async.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index 4451dafe06b4..840b2c89ffd5 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -94,8 +94,6 @@ def __init__( super(QueueClient, self).__init__( account_url, queue_name=queue_name, credential=credential, loop=loop, **kwargs ) - # Null out sync client in favor of async client below - self._client = None # type: ignore[assignment] self._client = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py index 1cf3aa48c76d..22bac2bdcf92 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_service_client_async.py @@ -97,8 +97,6 @@ def __init__( credential=credential, loop=loop, **kwargs) - # Null out sync client in favor of async client below - self._client = None # type: ignore[assignment] self._client = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop