Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
from ._models import (
CorsRule,
Metrics,
QueueProperties,
QueueAnalyticsLogging,
QueueProperties
)


Expand Down Expand Up @@ -111,7 +111,7 @@ def __init__(
self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access
self._configure_encryption(kwargs)

def _format_url(self, hostname):
def _format_url(self, hostname: str) -> str:
"""Format the endpoint URL according to the current location
mode hostname.
"""
Expand Down Expand Up @@ -155,8 +155,7 @@ def from_connection_string(
return cls(account_url, credential=credential, **kwargs)

@distributed_trace
def get_service_stats(self, **kwargs):
# type: (Any) -> Dict[str, Any]
def get_service_stats(self, **kwargs: Any) -> Dict[str, Any]:
"""Retrieves statistics related to replication for the Queue service.

It is only available when read-access geo-redundant replication is enabled for
Expand All @@ -182,15 +181,14 @@ def get_service_stats(self, **kwargs):
"""
timeout = kwargs.pop('timeout', None)
try:
stats = self._client.service.get_statistics( # type: ignore
stats = self._client.service.get_statistics(
timeout=timeout, use_location=LocationMode.SECONDARY, **kwargs)
return service_stats_deserialize(stats)
except HttpResponseError as error:
process_storage_error(error)

@distributed_trace
def get_service_properties(self, **kwargs):
# type: (Any) -> Dict[str, Any]
def get_service_properties(self, **kwargs: Any) -> Dict[str, Any]:
"""Gets the properties of a storage account's Queue service, including
Azure Storage Analytics.

Expand All @@ -211,20 +209,19 @@ def get_service_properties(self, **kwargs):
"""
timeout = kwargs.pop('timeout', None)
try:
service_props = self._client.service.get_properties(timeout=timeout, **kwargs) # type: ignore
service_props = self._client.service.get_properties(timeout=timeout, **kwargs)
return service_properties_deserialize(service_props)
except HttpResponseError as error:
process_storage_error(error)

@distributed_trace
def set_service_properties( # type: ignore
self, analytics_logging=None, # type: Optional[QueueAnalyticsLogging]
hour_metrics=None, # type: Optional[Metrics]
minute_metrics=None, # type: Optional[Metrics]
cors=None, # type: Optional[List[CorsRule]]
**kwargs # type: Any
):
# type: (...) -> None
def set_service_properties(
self, analytics_logging: Optional["QueueAnalyticsLogging"] = None,
hour_metrics: Optional["Metrics"] = None,
minute_metrics: Optional["Metrics"] = None,
cors: Optional[List["CorsRule"]] = None,
**kwargs: Any
) -> None:
"""Sets the properties of a storage account's Queue service, including
Azure Storage Analytics.

Expand Down Expand Up @@ -268,17 +265,16 @@ def set_service_properties( # type: ignore
cors=cors
)
try:
return self._client.service.set_properties(props, timeout=timeout, **kwargs) # type: ignore
return self._client.service.set_properties(props, timeout=timeout, **kwargs)
except HttpResponseError as error:
process_storage_error(error)

@distributed_trace
def list_queues(
self, name_starts_with=None, # type: Optional[str]
include_metadata=False, # type: Optional[bool]
**kwargs # type: Any
):
# type: (...) -> ItemPaged[QueueProperties]
self, name_starts_with: Optional[str] = None,
include_metadata: Optional[bool] = False,
**kwargs: Any
) -> ItemPaged["QueueProperties"]:
"""Returns a generator to list the queues under the specified account.

The generator will lazily follow the continuation tokens returned by
Expand Down Expand Up @@ -328,11 +324,10 @@ def list_queues(

@distributed_trace
def create_queue(
self, name, # type: str
metadata=None, # type: Optional[Dict[str, str]]
**kwargs # type: Any
):
# type: (...) -> QueueClient
self, name: str,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> QueueClient:
"""Creates a new queue under the specified account.

If a queue with the same name already exists, the operation fails.
Expand All @@ -342,7 +337,7 @@ def create_queue(
:param metadata:
A dict with name_value pairs to associate with the
queue as metadata. Example: {'Category': 'test'}
:type metadata: dict(str, str)
:type metadata: Dict[str, str]
:keyword int timeout:
The timeout parameter is expressed in seconds.
:rtype: ~azure.storage.queue.QueueClient
Expand All @@ -366,10 +361,9 @@ def create_queue(
@distributed_trace
def delete_queue(
self,
queue, # type: Union[QueueProperties, str]
**kwargs # type: Any
):
# type: (...) -> None
queue: Union["QueueProperties", str],
**kwargs: Any
) -> None:
"""Deletes the specified queue and any messages it contains.

When a queue is successfully deleted, it is immediately marked for deletion
Expand Down Expand Up @@ -402,11 +396,11 @@ def delete_queue(
kwargs.setdefault('merge_span', True)
queue_client.delete_queue(timeout=timeout, **kwargs)

def get_queue_client(self,
queue, # type: Union[QueueProperties, str]
**kwargs # type: Any
):
# type: (...) -> QueueClient
def get_queue_client(
self,
queue: Union["QueueProperties", str],
**kwargs: Any
) -> QueueClient:
"""Get a client to interact with the specified queue.

The queue need not already exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
from .._models import (
CorsRule,
Metrics,
QueueProperties,
QueueAnalyticsLogging,
QueueProperties,
)


Expand Down Expand Up @@ -92,19 +92,18 @@ def __init__(
) -> None:
kwargs['retry_policy'] = kwargs.get('retry_policy') or ExponentialRetry(**kwargs)
loop = kwargs.pop('loop', None)
super(QueueServiceClient, self).__init__( # type: ignore
super(QueueServiceClient, self).__init__(
account_url,
credential=credential,
loop=loop,
**kwargs)
self._client = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop) # type: ignore
self._client = AzureQueueStorage(self.url, base_url=self.url, pipeline=self._pipeline, loop=loop)
self._client._config.version = get_api_version(kwargs) # pylint: disable=protected-access
self._loop = loop
self._configure_encryption(kwargs)

@distributed_trace_async
async def get_service_stats(self, **kwargs):
# type: (Optional[Any]) -> Dict[str, Any]
async def get_service_stats(self, **kwargs: Any) -> Dict[str, Any]:
"""Retrieves statistics related to replication for the Queue service.

It is only available when read-access geo-redundant replication is enabled for
Expand All @@ -130,15 +129,14 @@ async def get_service_stats(self, **kwargs):
"""
timeout = kwargs.pop('timeout', None)
try:
stats = await self._client.service.get_statistics( # type: ignore
stats = await self._client.service.get_statistics(
timeout=timeout, use_location=LocationMode.SECONDARY, **kwargs)
return service_stats_deserialize(stats)
except HttpResponseError as error:
process_storage_error(error)

@distributed_trace_async
async def get_service_properties(self, **kwargs):
# type: (Optional[Any]) -> Dict[str, Any]
async def get_service_properties(self, **kwargs: Any) -> Dict[str, Any]:
"""Gets the properties of a storage account's Queue service, including
Azure Storage Analytics.

Expand All @@ -159,20 +157,19 @@ async def get_service_properties(self, **kwargs):
"""
timeout = kwargs.pop('timeout', None)
try:
service_props = await self._client.service.get_properties(timeout=timeout, **kwargs) # type: ignore
service_props = await self._client.service.get_properties(timeout=timeout, **kwargs)
return service_properties_deserialize(service_props)
except HttpResponseError as error:
process_storage_error(error)

@distributed_trace_async
async def set_service_properties( # type: ignore
self, analytics_logging=None, # type: Optional[QueueAnalyticsLogging]
hour_metrics=None, # type: Optional[Metrics]
minute_metrics=None, # type: Optional[Metrics]
cors=None, # type: Optional[List[CorsRule]]
**kwargs
):
# type: (...) -> None
async def set_service_properties(
self, analytics_logging: Optional["QueueAnalyticsLogging"] = None,
hour_metrics: Optional["Metrics"] = None,
minute_metrics: Optional["Metrics"] = None,
cors: Optional[List["CorsRule"]] = None,
**kwargs: Any
) -> None:
"""Sets the properties of a storage account's Queue service, including
Azure Storage Analytics.

Expand Down Expand Up @@ -216,16 +213,16 @@ async def set_service_properties( # type: ignore
cors=cors
)
try:
return await self._client.service.set_properties(props, timeout=timeout, **kwargs) # type: ignore
return await self._client.service.set_properties(props, timeout=timeout, **kwargs)
except HttpResponseError as error:
process_storage_error(error)

@distributed_trace
def list_queues(
self, name_starts_with=None, # type: Optional[str]
include_metadata=False, # type: Optional[bool]
**kwargs
): # type: (...) -> AsyncItemPaged
self, name_starts_with: Optional[str] = None,
include_metadata: Optional[bool] = False,
**kwargs: Any
) -> AsyncItemPaged:
"""Returns a generator to list the queues under the specified account.

The generator will lazily follow the continuation tokens returned by
Expand Down Expand Up @@ -274,12 +271,11 @@ def list_queues(
)

@distributed_trace_async
async def create_queue( # type: ignore
self, name, # type: str
metadata=None, # type: Optional[Dict[str, str]]
**kwargs
):
# type: (...) -> QueueClient
async def create_queue(
self, name: str,
metadata: Optional[Dict[str, str]] = None,
**kwargs: Any
) -> QueueClient:
"""Creates a new queue under the specified account.

If a queue with the same name already exists, the operation fails.
Expand All @@ -289,7 +285,7 @@ async def create_queue( # type: ignore
:param metadata:
A dict with name_value pairs to associate with the
queue as metadata. Example: {'Category': 'test'}
:type metadata: dict(str, str)
:type metadata: Dict[str, str]
:keyword int timeout:
The timeout parameter is expressed in seconds.
:rtype: ~azure.storage.queue.aio.QueueClient
Expand All @@ -311,11 +307,10 @@ async def create_queue( # type: ignore
return queue

@distributed_trace_async
async def delete_queue( # type: ignore
self, queue, # type: Union[QueueProperties, str]
**kwargs
):
# type: (...) -> None
async def delete_queue(
self, queue: Union["QueueProperties", str],
**kwargs: Any
) -> None:
"""Deletes the specified queue and any messages it contains.

When a queue is successfully deleted, it is immediately marked for deletion
Expand Down Expand Up @@ -348,8 +343,11 @@ async def delete_queue( # type: ignore
kwargs.setdefault('merge_span', True)
await queue_client.delete_queue(timeout=timeout, **kwargs)

def get_queue_client(self, queue, **kwargs):
# type: (Union[QueueProperties, str], Optional[Any]) -> QueueClient
def get_queue_client(
self,
queue: Union["QueueProperties", str],
**kwargs: Any
) -> QueueClient:
"""Get a client to interact with the specified queue.

The queue need not already exist.
Expand Down