Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

* Session receivers are now created via their own top level functions, e.g. `get_queue_sesison_receiver` and `get_subscription_session_receiver`. Non session receivers no longer take session_id as a paramter.
* `ServiceBusSender.send()` no longer takes a timeout parameter, as it should be redundant with retry options provided when creating the client.
* Exception imports have been removed from module `azure.servicebus`. Import from `azure.servicebus.exceptions` instead.

## 7.0.0b1 (2020-04-06)

Expand Down
29 changes: 0 additions & 29 deletions sdk/servicebus/azure-servicebus/azure/servicebus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,6 @@
from ._common.message import Message, BatchMessage, PeekMessage, ReceivedMessage
from ._common.constants import ReceiveSettleMode, NEXT_AVAILABLE
from ._common.utils import AutoLockRenew
from .exceptions import (
ServiceBusError,
ServiceBusResourceNotFound,
ServiceBusConnectionError,
ServiceBusAuthorizationError,
InvalidHandlerState,
NoActiveSession,
MessageAlreadySettled,
MessageSettleFailed,
MessageSendFailed,
MessageLockExpired,
SessionLockExpired,
AutoLockRenewFailed,
AutoLockRenewTimeout
)


TransportType = constants.TransportType

Expand All @@ -43,19 +27,6 @@
'ReceivedMessage',
'ReceiveSettleMode',
'NEXT_AVAILABLE',
'ServiceBusError',
'ServiceBusResourceNotFound',
'ServiceBusConnectionError',
'ServiceBusAuthorizationError',
'InvalidHandlerState',
'NoActiveSession',
'MessageAlreadySettled',
'MessageSettleFailed',
'MessageSendFailed',
'MessageLockExpired',
'SessionLockExpired',
'AutoLockRenewFailed',
'AutoLockRenewTimeout',
'ServiceBusClient',
'ServiceBusReceiver',
'ServiceBusSessionReceiver',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,11 +291,11 @@ def from_connection_string(
return cls(**constructor_args)

def send(self, message):
# type: (Union[Message, BatchMessage], float) -> None
# type: (Union[Message, BatchMessage]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.

:param message: The ServiceBus message to be sent.
:type message: ~azure.servicebus.Message
:type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageSendFailed if the message fails to
send or ~azure.servicebus.common.errors.OperationTimeoutError if sending times out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,11 @@ def from_connection_string(
return cls(**constructor_args)

async def send(self, message):
# type: (Message, float) -> None
# type: (Union[Message, BatchMessage]) -> None
"""Sends message and blocks until acknowledgement is received or operation times out.

:param message: The ServiceBus message to be sent.
:type message: ~azure.servicebus.Message
:type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage
:rtype: None
:raises: ~azure.servicebus.common.errors.MessageSendFailed if the message fails to
send or ~azure.servicebus.common.errors.OperationTimeoutError if sending times out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import os
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ReceiveSettleMode

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import asyncio
import uuid

from azure.servicebus.aio import ServiceBusClient, AutoLockRenew
from azure.servicebus import NoActiveSession, Message
from azure.servicebus import Message
from azure.servicebus.exceptions import NoActiveSession


CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
# Note: This must be a session-enabled queue.
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]


async def message_processing(servicebus_client, queue_name):
while True:
try:
async with servicebus_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver:
renewer = AutoLockRenew()
renewer.register(receiver.session, timeout=None)
renewer.register(receiver.session)
await receiver.session.set_session_state("OPEN")
async for message in receiver:
print("Message: {}".format(message))
Expand All @@ -37,7 +39,7 @@ async def message_processing(servicebus_client, queue_name):
if str(message) == 'shutdown':
await receiver.session.set_session_state("CLOSED")
break
renewer.shutdown()
await renewer.shutdown()
except NoActiveSession:
print("There are no non-empty sessions remaining; exiting. This may present as a UserError in the azure portal.")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# --------------------------------------------------------------------------------------------

"""
Example to show sending message(s) to and receiving messages from a Service Bus Queue with session enabled. asynchronously.
Example to show sending message(s) to and receiving messages from a Service Bus Queue with session enabled asynchronously.
"""

# pylint: disable=C0111
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import os
import uuid
import concurrent

from azure.servicebus import ServiceBusClient, Message, AutoLockRenew
from azure.servicebus import NoActiveSession
from azure.servicebus.exceptions import NoActiveSession

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
# Note: This must be a session-enabled queue.
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]


def message_processing(sb_client, queue_name, messages):
while True:
try:
with sb_client.get_queue_session_receiver(queue_name, idle_timeout=1) as receiver:
renewer = AutoLockRenew()
renewer.register(receiver.session, timeout=None)
renewer.register(receiver.session)
receiver.session.set_session_state("OPEN")
for message in receiver:
messages.append(message)
Expand Down