diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/_queue_client.py b/sdk/storage/azure-storage-queue/azure/storage/queue/_queue_client.py index 6898af943f2c..09e107e126a9 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/_queue_client.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/_queue_client.py @@ -78,11 +78,11 @@ class QueueClient(StorageAccountHostsMixin, StorageEncryptionMixin): :caption: Create the queue client with url and credential. """ def __init__( - self, account_url: str, - queue_name: str, - credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]] = None, # pylint: disable=line-too-long - **kwargs: Any - ) -> None: + self, account_url: str, + queue_name: str, + credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]] = None, # pylint: disable=line-too-long + **kwargs: Any + ) -> None: try: if not account_url.lower().startswith('http'): account_url = "https://" + account_url @@ -108,7 +108,7 @@ def __init__( self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._configure_encryption(kwargs) - def _format_url(self, hostname): + def _format_url(self, hostname: str) -> str: """Format the endpoint URL according to the current location mode hostname. """ @@ -121,10 +121,10 @@ def _format_url(self, hostname): @classmethod def from_queue_url( - cls, queue_url: str, - credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]] = None, # pylint: disable=line-too-long - **kwargs: Any - ) -> Self: + cls, queue_url: str, + credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]] = None, # pylint: disable=line-too-long + **kwargs: Any + ) -> Self: """A client to interact with a specific Queue. :param str queue_url: The full URI to the queue, including SAS token if used. @@ -164,11 +164,11 @@ def from_queue_url( @classmethod def from_connection_string( - cls, conn_str: str, - queue_name: str, - credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]] = None, # pylint: disable=line-too-long - **kwargs: Any - ) -> Self: + cls, conn_str: str, + queue_name: str, + credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]] = None, # pylint: disable=line-too-long + **kwargs: Any + ) -> Self: """Create QueueClient from a Connection String. :param str conn_str: @@ -200,17 +200,20 @@ def from_connection_string( conn_str, credential, 'queue') if 'secondary_hostname' not in kwargs: kwargs['secondary_hostname'] = secondary - return cls(account_url, queue_name=queue_name, credential=credential, **kwargs) # type: ignore + return cls(account_url, queue_name=queue_name, credential=credential, **kwargs) @distributed_trace - def create_queue(self, **kwargs): - # type: (Any) -> None + def create_queue( + self, *, + metadata: Optional[Dict[str, str]] = None, + **kwargs: Any + ) -> None: """Creates a new queue in the storage account. If a queue with the same name already exists, the operation fails with a `ResourceExistsError`. - :keyword dict(str,str) metadata: + :keyword Dict[str,str] metadata: A dict containing name-value pairs to associate with the queue as metadata. Note that metadata names preserve the case with which they were created, but are case-insensitive when set or read. @@ -234,11 +237,10 @@ def create_queue(self, **kwargs): :caption: Create a queue. """ headers = kwargs.pop('headers', {}) - metadata = kwargs.pop('metadata', None) timeout = kwargs.pop('timeout', None) - headers.update(add_metadata_headers(metadata)) # type: ignore + headers.update(add_metadata_headers(metadata)) try: - return self._client.queue.create( # type: ignore + return self._client.queue.create( metadata=metadata, timeout=timeout, headers=headers, @@ -248,8 +250,7 @@ def create_queue(self, **kwargs): process_storage_error(error) @distributed_trace - def delete_queue(self, **kwargs): - # type: (Any) -> None + def delete_queue(self, **kwargs: Any) -> None: """Deletes the specified queue and any messages it contains. When a queue is successfully deleted, it is immediately marked for deletion @@ -284,8 +285,7 @@ def delete_queue(self, **kwargs): process_storage_error(error) @distributed_trace - def get_queue_properties(self, **kwargs): - # type: (Any) -> QueueProperties + def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": """Returns all user-defined metadata for the specified queue. The data returned does not include the queue's list of messages. @@ -313,14 +313,13 @@ def get_queue_properties(self, **kwargs): except HttpResponseError as error: process_storage_error(error) response.name = self.queue_name - return response # type: ignore + return response @distributed_trace - def set_queue_metadata(self, - metadata=None, # type: Optional[Dict[str, Any]] - **kwargs # type: Any - ): - # type: (...) -> None + def set_queue_metadata( + self, metadata: Optional[Dict[str, str]] = None, + **kwargs: Any + ) -> None: """Sets user-defined metadata on the specified queue. Metadata is associated with the queue as name-value pairs. @@ -328,7 +327,7 @@ def set_queue_metadata(self, :param metadata: A dict containing name-value pairs to associate with the queue as metadata. - :type metadata: dict(str, str) + :type metadata: Dict[str, str] :keyword int timeout: Sets the server-side timeout for the operation in seconds. For more details see https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations. @@ -347,9 +346,9 @@ def set_queue_metadata(self, """ timeout = kwargs.pop('timeout', None) headers = kwargs.pop('headers', {}) - headers.update(add_metadata_headers(metadata)) # type: ignore + headers.update(add_metadata_headers(metadata)) try: - return self._client.queue.set_metadata( # type: ignore + return self._client.queue.set_metadata( timeout=timeout, headers=headers, cls=return_response_headers, @@ -358,8 +357,7 @@ def set_queue_metadata(self, process_storage_error(error) @distributed_trace - def get_queue_access_policy(self, **kwargs): - # type: (Any) -> Dict[str, AccessPolicy] + def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy]: """Returns details about any stored access policies specified on the queue that may be used with Shared Access Signatures. @@ -370,7 +368,7 @@ def get_queue_access_policy(self, **kwargs): see `here `_. :return: A dictionary of access policies associated with the queue. - :rtype: dict(str, ~azure.storage.queue.AccessPolicy) + :rtype: Dict[str, ~azure.storage.queue.AccessPolicy] """ timeout = kwargs.pop('timeout', None) try: @@ -383,11 +381,10 @@ def get_queue_access_policy(self, **kwargs): return {s.id: s.access_policy or AccessPolicy() for s in identifiers} @distributed_trace - def set_queue_access_policy(self, - signed_identifiers, # type: Dict[str, AccessPolicy] - **kwargs # type: Any - ): - # type: (...) -> None + def set_queue_access_policy( + self, signed_identifiers: Dict[str, AccessPolicy], + **kwargs: Any + ) -> None: """Sets stored access policies for the queue that may be used with Shared Access Signatures. @@ -406,7 +403,7 @@ def set_queue_access_policy(self, SignedIdentifier access policies to associate with the queue. This may contain up to 5 elements. An empty dict will clear the access policies set on the service. - :type signed_identifiers: dict(str, ~azure.storage.queue.AccessPolicy) + :type signed_identifiers: Dict[str, ~azure.storage.queue.AccessPolicy] :keyword int timeout: Sets the server-side timeout for the operation in seconds. For more details see https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations. @@ -434,7 +431,7 @@ def set_queue_access_policy(self, value.start = serialize_iso(value.start) value.expiry = serialize_iso(value.expiry) identifiers.append(SignedIdentifier(id=key, access_policy=value)) - signed_identifiers = identifiers # type: ignore + signed_identifiers = identifiers try: self._client.queue.set_access_policy( queue_acl=signed_identifiers or None, @@ -445,11 +442,12 @@ def set_queue_access_policy(self, @distributed_trace def send_message( - self, - content, # type: Any - **kwargs # type: Any - ): - # type: (...) -> QueueMessage + self, content: Any, + *, + visibility_timeout: Optional[int] = None, + time_to_live: Optional[int] = None, + **kwargs: Any + ) -> "QueueMessage": """Adds a new message to the back of the message queue. The visibility timeout specifies the time that the message will be @@ -463,7 +461,7 @@ def send_message( If the key-encryption-key field is set on the local service object, this method will encrypt the content before uploading. - :param obj content: + :param Any content: Message content. Allowed type is determined by the encode_function set on the service. Default is str. The encoded message can be up to 64KB in size. @@ -499,8 +497,6 @@ def send_message( :dedent: 12 :caption: Send messages. """ - visibility_timeout = kwargs.pop('visibility_timeout', None) - time_to_live = kwargs.pop('time_to_live', None) timeout = kwargs.pop('timeout', None) try: self._config.message_encode_policy.configure( @@ -540,7 +536,11 @@ def send_message( process_storage_error(error) @distributed_trace - def receive_message(self, **kwargs: Any) -> Optional[QueueMessage]: + def receive_message( + self, *, + visibility_timeout: Optional[int] = None, + **kwargs: Any + ) -> Optional[QueueMessage]: """Removes one message from the front of the queue. When the message is retrieved from the queue, the response includes the message @@ -578,7 +578,6 @@ def receive_message(self, **kwargs: Any) -> Optional[QueueMessage]: :dedent: 12 :caption: Receive one message from the queue. """ - visibility_timeout = kwargs.pop('visibility_timeout', None) timeout = kwargs.pop('timeout', None) self._config.message_decode_policy.configure( require_encryption=self.require_encryption, @@ -599,8 +598,13 @@ def receive_message(self, **kwargs: Any) -> Optional[QueueMessage]: process_storage_error(error) @distributed_trace - def receive_messages(self, **kwargs): - # type: (Any) -> ItemPaged[QueueMessage] + def receive_messages( + self, *, + messages_per_page: Optional[int] = None, + visibility_timeout: Optional[int] = None, + max_messages: Optional[int] = None, + **kwargs: Any + ) -> ItemPaged[QueueMessage]: """Removes one or more messages from the front of the queue. When a message is retrieved from the queue, the response includes the message @@ -637,14 +641,14 @@ def receive_messages(self, **kwargs): larger than 7 days. The visibility timeout of a message cannot be set to a value later than the expiry time. visibility_timeout should be set to a value smaller than the time-to-live value. + :keyword int max_messages: + An integer that specifies the maximum number of messages to retrieve from the queue. :keyword int timeout: Sets the server-side timeout for the operation in seconds. For more details see https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations. This value is not tracked or validated on the client. To configure client-side network timesouts see `here `_. - :keyword int max_messages: - An integer that specifies the maximum number of messages to retrieve from the queue. :return: Returns a message iterator of dict-like Message objects. :rtype: ~azure.core.paging.ItemPaged[~azure.storage.queue.QueueMessage] @@ -658,10 +662,7 @@ def receive_messages(self, **kwargs): :dedent: 12 :caption: Receive messages from the queue. """ - messages_per_page = kwargs.pop('messages_per_page', None) - visibility_timeout = kwargs.pop('visibility_timeout', None) timeout = kwargs.pop('timeout', None) - max_messages = kwargs.pop('max_messages', None) self._config.message_decode_policy.configure( require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key, @@ -683,13 +684,14 @@ def receive_messages(self, **kwargs): process_storage_error(error) @distributed_trace - def update_message(self, - message, # type: Any - pop_receipt=None, # type: Optional[str] - content=None, # type: Optional[Any] - **kwargs # type: Any - ): - # type: (...) -> QueueMessage + def update_message( + self, message: Union[str, QueueMessage], + pop_receipt: Optional[str] = None, + content: Optional[Any] = None, + *, + visibility_timeout: Optional[int] = None, + **kwargs: Any + ) -> QueueMessage: """Updates the visibility timeout of a message. You can also use this operation to update the contents of a message. @@ -710,7 +712,7 @@ def update_message(self, :param str pop_receipt: A valid pop receipt value returned from an earlier call to the :func:`~receive_messages` or :func:`~update_message` operation. - :param obj content: + :param Any content: Message content. Allowed type is determined by the encode_function set on the service. Default is str. :keyword int visibility_timeout: @@ -740,7 +742,6 @@ def update_message(self, :dedent: 12 :caption: Update a message. """ - visibility_timeout = kwargs.pop('visibility_timeout', None) timeout = kwargs.pop('timeout', None) try: message_id = message.id @@ -780,7 +781,7 @@ def update_message(self, encoded_message_text = self._config.message_encode_policy(message_text) updated = GenQueueMessage(message_text=encoded_message_text) else: - updated = None # type: ignore + updated = None try: response = self._client.message_id.update( queue_message=updated, @@ -802,11 +803,10 @@ def update_message(self, process_storage_error(error) @distributed_trace - def peek_messages(self, - max_messages=None, # type: Optional[int] - **kwargs # type: Any - ): - # type: (...) -> List[QueueMessage] + def peek_messages( + self, max_messages: Optional[int] = None, + **kwargs: Any + ) -> List[QueueMessage]: """Retrieves one or more messages from the front of the queue, but does not alter the visibility of the message. @@ -867,8 +867,7 @@ def peek_messages(self, process_storage_error(error) @distributed_trace - def clear_messages(self, **kwargs): - # type: (Any) -> None + def clear_messages(self, **kwargs: Any) -> None: """Deletes all messages from the specified queue. :keyword int timeout: @@ -894,12 +893,11 @@ def clear_messages(self, **kwargs): process_storage_error(error) @distributed_trace - def delete_message(self, - message, # type: Any - pop_receipt=None, # type: Optional[str] - **kwargs # type: Any - ): - # type: (...) -> None + def delete_message( + self, message: Union[str, QueueMessage], + pop_receipt: Optional[str] = None, + **kwargs: Any + ) -> None: """Deletes the specified message. Normally after a client retrieves a message with the receive messages operation, diff --git a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py index c165c07bf431..0eaf9010007f 100644 --- a/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py +++ b/sdk/storage/azure-storage-queue/azure/storage/queue/aio/_queue_client_async.py @@ -84,25 +84,28 @@ class QueueClient(AsyncStorageAccountHostsMixin, QueueClientBase, StorageEncrypt """ def __init__( - self, account_url: str, - queue_name: str, - credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]] = None, # pylint: disable=line-too-long - **kwargs: Any - ) -> None: + self, account_url: str, + queue_name: str, + credential: Optional[Union[str, Dict[str, str], "AzureNamedKeyCredential", "AzureSasCredential", "TokenCredential"]] = None, # pylint: disable=line-too-long + **kwargs: Any + ) -> None: kwargs["retry_policy"] = kwargs.get("retry_policy") or ExponentialRetry(**kwargs) loop = kwargs.pop('loop', None) super(QueueClient, self).__init__( account_url, queue_name=queue_name, credential=credential, loop=loop, **kwargs ) self._client = AzureQueueStorage(self.url, base_url=self.url, - pipeline=self._pipeline, loop=loop) # type: ignore + pipeline=self._pipeline, loop=loop) self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access self._loop = loop self._configure_encryption(kwargs) @distributed_trace_async - async def create_queue(self, **kwargs): - # type: (Optional[Any]) -> None + async def create_queue( + self, *, + metadata: Optional[Dict[str, str]] = None, + **kwargs: Any + ) -> None: """Creates a new queue in the storage account. If a queue with the same name already exists, the operation fails with @@ -131,20 +134,18 @@ async def create_queue(self, **kwargs): :dedent: 12 :caption: Create a queue. """ - metadata = kwargs.pop('metadata', None) timeout = kwargs.pop('timeout', None) headers = kwargs.pop("headers", {}) - headers.update(add_metadata_headers(metadata)) # type: ignore + headers.update(add_metadata_headers(metadata)) try: - return await self._client.queue.create( # type: ignore + return await self._client.queue.create( metadata=metadata, timeout=timeout, headers=headers, cls=deserialize_queue_creation, **kwargs ) except HttpResponseError as error: process_storage_error(error) @distributed_trace_async - async def delete_queue(self, **kwargs): - # type: (Optional[Any]) -> None + async def delete_queue(self, **kwargs: Any) -> None: """Deletes the specified queue and any messages it contains. When a queue is successfully deleted, it is immediately marked for deletion @@ -179,8 +180,7 @@ async def delete_queue(self, **kwargs): process_storage_error(error) @distributed_trace_async - async def get_queue_properties(self, **kwargs): - # type: (Optional[Any]) -> QueueProperties + async def get_queue_properties(self, **kwargs: Any) -> "QueueProperties": """Returns all user-defined metadata for the specified queue. The data returned does not include the queue's list of messages. @@ -207,11 +207,13 @@ async def get_queue_properties(self, **kwargs): except HttpResponseError as error: process_storage_error(error) response.name = self.queue_name - return response # type: ignore + return response @distributed_trace_async - async def set_queue_metadata(self, metadata=None, **kwargs): - # type: (Optional[Dict[str, Any]], Optional[Any]) -> None + async def set_queue_metadata( + self, metadata: Optional[Dict[str, str]] = None, + **kwargs: Any + ) -> None: """Sets user-defined metadata on the specified queue. Metadata is associated with the queue as name-value pairs. @@ -238,17 +240,16 @@ async def set_queue_metadata(self, metadata=None, **kwargs): """ timeout = kwargs.pop('timeout', None) headers = kwargs.pop("headers", {}) - headers.update(add_metadata_headers(metadata)) # type: ignore + headers.update(add_metadata_headers(metadata)) try: - return await self._client.queue.set_metadata( # type: ignore + return await self._client.queue.set_metadata( timeout=timeout, headers=headers, cls=return_response_headers, **kwargs ) except HttpResponseError as error: process_storage_error(error) @distributed_trace_async - async def get_queue_access_policy(self, **kwargs): - # type: (Optional[Any]) -> Dict[str, Any] + async def get_queue_access_policy(self, **kwargs: Any) -> Dict[str, AccessPolicy]: """Returns details about any stored access policies specified on the queue that may be used with Shared Access Signatures. @@ -271,8 +272,10 @@ async def get_queue_access_policy(self, **kwargs): return {s.id: s.access_policy or AccessPolicy() for s in identifiers} @distributed_trace_async - async def set_queue_access_policy(self, signed_identifiers, **kwargs): - # type: (Dict[str, AccessPolicy], Optional[Any]) -> None + async def set_queue_access_policy( + self, signed_identifiers: Dict[str, AccessPolicy], + **kwargs: Any + ) -> None: """Sets stored access policies for the queue that may be used with Shared Access Signatures. @@ -320,19 +323,20 @@ async def set_queue_access_policy(self, signed_identifiers, **kwargs): value.start = serialize_iso(value.start) value.expiry = serialize_iso(value.expiry) identifiers.append(SignedIdentifier(id=key, access_policy=value)) - signed_identifiers = identifiers # type: ignore + signed_identifiers = identifiers try: await self._client.queue.set_access_policy(queue_acl=signed_identifiers or None, timeout=timeout, **kwargs) except HttpResponseError as error: process_storage_error(error) @distributed_trace_async - async def send_message( # type: ignore - self, - content, # type: Any - **kwargs # type: Optional[Any] - ): - # type: (...) -> QueueMessage + async def send_message( + self, content: Any, + *, + visibility_timeout: Optional[int] = None, + time_to_live: Optional[int] = None, + **kwargs: Any + ) -> "QueueMessage": """Adds a new message to the back of the message queue. The visibility timeout specifies the time that the message will be @@ -346,7 +350,7 @@ async def send_message( # type: ignore If the key-encryption-key field is set on the local service object, this method will encrypt the content before uploading. - :param obj content: + :param Any content: Message content. Allowed type is determined by the encode_function set on the service. Default is str. The encoded message can be up to 64KB in size. @@ -382,8 +386,6 @@ async def send_message( # type: ignore :dedent: 16 :caption: Send messages. """ - visibility_timeout = kwargs.pop('visibility_timeout', None) - time_to_live = kwargs.pop('time_to_live', None) timeout = kwargs.pop('timeout', None) try: self._config.message_encode_policy.configure( @@ -424,7 +426,11 @@ async def send_message( # type: ignore process_storage_error(error) @distributed_trace_async - async def receive_message(self, **kwargs: Any) -> Optional[QueueMessage]: + async def receive_message( + self, *, + visibility_timeout: Optional[int] = None, + **kwargs: Any + ) -> Optional[QueueMessage]: """Removes one message from the front of the queue. When the message is retrieved from the queue, the response includes the message @@ -462,7 +468,6 @@ async def receive_message(self, **kwargs: Any) -> Optional[QueueMessage]: :dedent: 12 :caption: Receive one message from the queue. """ - visibility_timeout = kwargs.pop('visibility_timeout', None) timeout = kwargs.pop('timeout', None) self._config.message_decode_policy.configure( require_encryption=self.require_encryption, @@ -483,8 +488,13 @@ async def receive_message(self, **kwargs: Any) -> Optional[QueueMessage]: process_storage_error(error) @distributed_trace - def receive_messages(self, **kwargs): - # type: (Optional[Any]) -> AsyncItemPaged[QueueMessage] + def receive_messages( + self, *, + messages_per_page: Optional[int] = None, + visibility_timeout: Optional[int] = None, + max_messages: Optional[int] = None, + **kwargs: Any + ) -> AsyncItemPaged[QueueMessage]: """Removes one or more messages from the front of the queue. When a message is retrieved from the queue, the response includes the message @@ -512,14 +522,14 @@ def receive_messages(self, **kwargs): larger than 7 days. The visibility timeout of a message cannot be set to a value later than the expiry time. visibility_timeout should be set to a value smaller than the time-to-live value. + :keyword int max_messages: + An integer that specifies the maximum number of messages to retrieve from the queue. :keyword int timeout: Sets the server-side timeout for the operation in seconds. For more details see https://learn.microsoft.com/en-us/rest/api/storageservices/setting-timeouts-for-queue-service-operations. This value is not tracked or validated on the client. To configure client-side network timesouts see `here `_. - :keyword int max_messages: - An integer that specifies the maximum number of messages to retrieve from the queue. :return: Returns a message iterator of dict-like Message objects. :rtype: ~azure.core.async_paging.AsyncItemPaged[~azure.storage.queue.QueueMessage] @@ -533,10 +543,7 @@ def receive_messages(self, **kwargs): :dedent: 16 :caption: Receive messages from the queue. """ - messages_per_page = kwargs.pop('messages_per_page', None) - visibility_timeout = kwargs.pop('visibility_timeout', None) timeout = kwargs.pop('timeout', None) - max_messages = kwargs.pop('max_messages', None) self._config.message_decode_policy.configure( require_encryption=self.require_encryption, key_encryption_key=self.key_encryption_key, @@ -560,13 +567,13 @@ def receive_messages(self, **kwargs): @distributed_trace_async async def update_message( - self, - message, - pop_receipt=None, - content=None, - **kwargs - ): - # type: (Any, int, Optional[str], Optional[Any], Any) -> QueueMessage + self, message: Union[str, QueueMessage], + pop_receipt: Optional[str] = None, + content: Optional[Any] = None, + *, + visibility_timeout: Optional[int] = None, + **kwargs: Any + ) -> QueueMessage: """Updates the visibility timeout of a message. You can also use this operation to update the contents of a message. @@ -587,7 +594,7 @@ async def update_message( :param str pop_receipt: A valid pop receipt value returned from an earlier call to the :func:`~receive_messages` or :func:`~update_message` operation. - :param obj content: + :param Any content: Message content. Allowed type is determined by the encode_function set on the service. Default is str. :keyword int visibility_timeout: @@ -617,7 +624,6 @@ async def update_message( :dedent: 16 :caption: Update a message. """ - visibility_timeout = kwargs.pop('visibility_timeout', None) timeout = kwargs.pop('timeout', None) try: message_id = message.id @@ -659,7 +665,7 @@ async def update_message( encoded_message_text = self._config.message_encode_policy(message_text) updated = GenQueueMessage(message_text=encoded_message_text) else: - updated = None # type: ignore + updated = None try: response = await self._client.message_id.update( queue_message=updated, @@ -682,8 +688,10 @@ async def update_message( process_storage_error(error) @distributed_trace_async - async def peek_messages(self, max_messages=None, **kwargs): - # type: (Optional[int], Optional[Any]) -> List[QueueMessage] + async def peek_messages( + self, max_messages: Optional[int] = None, + **kwargs: Any + ) -> List[QueueMessage]: """Retrieves one or more messages from the front of the queue, but does not alter the visibility of the message. @@ -743,8 +751,7 @@ async def peek_messages(self, max_messages=None, **kwargs): process_storage_error(error) @distributed_trace_async - async def clear_messages(self, **kwargs): - # type: (Optional[Any]) -> None + async def clear_messages(self, **kwargs: Any) -> None: """Deletes all messages from the specified queue. :keyword int timeout: @@ -770,8 +777,11 @@ async def clear_messages(self, **kwargs): process_storage_error(error) @distributed_trace_async - async def delete_message(self, message, pop_receipt=None, **kwargs): - # type: (Any, Optional[str], Any) -> None + async def delete_message( + self, message: Union[str, QueueMessage], + pop_receipt: Optional[str] = None, + **kwargs: Any + ) -> None: """Deletes the specified message. Normally after a client retrieves a message with the receive messages operation,