From 36594f5394fda3ad6c850def7559218bf4979a2e Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Thu, 23 Apr 2020 16:48:27 -0700 Subject: [PATCH 1/8] make schedule a property on the message --- .../azure/servicebus/_common/message.py | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 926142970537..57eafe94f426 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -240,27 +240,39 @@ def time_to_live(self, value): self.header.time_to_live = int(value) * 1000 @property - def body(self): - # type: () -> Union[bytes, Generator[bytes]] - """The body of the Message. + def scheduled_enqueue_time_utc(self): + # type: () -> Optional[datetime.datetime] + """Get or set the utc scheduled enqueue time to the message. + This property can be used for scheduling when sending a message through `ServiceBusSender.send` method. + Or you could use `ServiceBusSender.schedule` method to schedule messages which would return scheduled messages' + sequence numbers that can be used for future cancellation. - :rtype: bytes or generator[bytes] + :rtype: ~datetime.datetime """ - return self.message.get_data() + if self.message.annotations: + timestamp = self.message.annotations.get(_X_OPT_SCHEDULED_ENQUEUE_TIME) + if timestamp: + in_seconds = timestamp/1000.0 + return utc_from_timestamp(in_seconds) + return None - def schedule(self, schedule_time_utc): + @scheduled_enqueue_time_utc.setter + def scheduled_enqueue_time_utc(self, value): # type: (datetime.datetime) -> None - """Add a specific utc enqueue time to the message. - - :param schedule_time_utc: The scheduled utc time to enqueue the message. - :type schedule_time_utc: ~datetime.datetime - :rtype: None - """ if not self.properties.message_id: self.properties.message_id = str(uuid.uuid4()) if not self.message.annotations: self.message.annotations = {} - self.message.annotations[types.AMQPSymbol(_X_OPT_SCHEDULED_ENQUEUE_TIME)] = schedule_time_utc + self.message.annotations[types.AMQPSymbol(_X_OPT_SCHEDULED_ENQUEUE_TIME)] = value + + @property + def body(self): + # type: () -> Union[bytes, Generator[bytes]] + """The body of the Message. + + :rtype: bytes or generator[bytes] + """ + return self.message.get_data() class BatchMessage(object): @@ -399,20 +411,6 @@ def enqueued_time_utc(self): return utc_from_timestamp(in_seconds) return None - @property - def scheduled_enqueue_time_utc(self): - # type: () -> Optional[datetime.datetime] - """ - - :rtype: ~datetime.datetime - """ - if self.message.annotations: - timestamp = self.message.annotations.get(_X_OPT_SCHEDULED_ENQUEUE_TIME) - if timestamp: - in_seconds = timestamp/1000.0 - return utc_from_timestamp(in_seconds) - return None - @property def sequence_number(self): # type: () -> Optional[int] From 2d800eca00b50c63ea23f80cd0421c9ec629f399 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Fri, 24 Apr 2020 17:44:30 -0700 Subject: [PATCH 2/8] make send api public --- .../azure-servicebus/azure/servicebus/_servicebus_sender.py | 6 +++--- .../azure/servicebus/aio/_servicebus_sender_async.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index ef9c2f1cce82..9231b6d1d9f1 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -65,7 +65,7 @@ def _set_msg_timeout(self, timeout=None, last_exception=None): def _build_schedule_request(cls, schedule_time_utc, *messages): request_body = {MGMT_REQUEST_MESSAGES: []} for message in messages: - message.schedule(schedule_time_utc) + message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} message_data[MGMT_REQUEST_MESSAGE_ID] = message.properties.message_id if message.properties.group_id: @@ -188,7 +188,7 @@ def _send(self, message, timeout=None, last_exception=None): except Exception as e: raise MessageSendFailed(e) - def _schedule(self, message, schedule_time_utc): + def schedule(self, message, schedule_time_utc): # type: (Union[Message, BatchMessage], datetime.datetime) -> List[int] """Send Message or BatchMessage to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. @@ -219,7 +219,7 @@ def _schedule(self, message, schedule_time_utc): mgmt_handlers.schedule_op ) - def _cancel_scheduled_messages(self, sequence_numbers): + def cancel_scheduled_messages(self, sequence_numbers): # type: (Union[int, List[int]]) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 601441175a36..5febe1839c8e 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -139,7 +139,7 @@ async def _send(self, message, timeout=None, last_exception=None): except Exception as e: raise MessageSendFailed(e) - async def _schedule(self, message, schedule_time_utc): + async def schedule(self, message, schedule_time_utc): # type: (Union[Message, BatchMessage], datetime.datetime) -> List[int] """Send Message or BatchMessage to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. @@ -170,7 +170,7 @@ async def _schedule(self, message, schedule_time_utc): mgmt_handlers.schedule_op ) - async def _cancel_scheduled_messages(self, sequence_numbers): + async def cancel_scheduled_messages(self, sequence_numbers): # type: (Union[int, List[int]]) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. From 94286446b111b43533c949eac5fc16c3176c62d2 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Mon, 27 Apr 2020 18:20:21 -0700 Subject: [PATCH 3/8] schedule and cancellation --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 3 + .../azure/servicebus/_servicebus_sender.py | 24 ++++---- .../aio/_servicebus_sender_async.py | 24 ++++---- .../azure-servicebus/samples/README.md | 3 + .../sample_code_servicebus_async.py | 19 ++++++- ...chedule_messages_and_cancellation_async.py | 56 +++++++++++++++++++ .../sync_samples/sample_code_servicebus.py | 18 +++++- .../schedule_messages_and_cancellation.py | 50 +++++++++++++++++ .../tests/async_tests/test_queues_async.py | 8 +-- .../azure-servicebus/tests/test_queues.py | 12 ++-- 10 files changed, 179 insertions(+), 38 deletions(-) create mode 100644 sdk/servicebus/azure-servicebus/samples/async_samples/schedule_messages_and_cancellation_async.py create mode 100644 sdk/servicebus/azure-servicebus/samples/sync_samples/schedule_messages_and_cancellation.py diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index d94efd5680a8..50d6740aedd5 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -6,6 +6,9 @@ * Added method `get_topic_sender` in `ServiceBusClient` to get a `ServiceBusSender` for a topic. * Added method `get_subscription_receiver` in `ServiceBusClient` to get a `ServiceBusReceiver` for a subscription under specific topic. +* Added support for scheduling messages and scheduled message cancellation. + - Use `ServiceBusSender.schedule` for scheduling messages. + - Use `ServiceBusSender.cancel_scheduled_messages` for scheduled messages cancellation. **BugFixes** diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 9231b6d1d9f1..08bac4ba08a5 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -5,7 +5,7 @@ import logging import time import uuid -from typing import Any, TYPE_CHECKING, Union, List +from typing import Any, TYPE_CHECKING, Union, List, Iterable import uamqp from uamqp import SendClient, types @@ -188,15 +188,15 @@ def _send(self, message, timeout=None, last_exception=None): except Exception as e: raise MessageSendFailed(e) - def schedule(self, message, schedule_time_utc): - # type: (Union[Message, BatchMessage], datetime.datetime) -> List[int] - """Send Message or BatchMessage to be enqueued at a specific time. + def schedule(self, messages, schedule_time_utc): + # type: (Union[Message, Iterable[Message]], datetime.datetime) -> List[int] + """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. - :param message: The messages to schedule. - :type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage + :param messages: The message or list of messages to schedule. + :type messages: ~azure.servicebus.Message or Iterator[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime - :rtype: List[int] + :rtype: list[int] .. admonition:: Example: @@ -209,10 +209,10 @@ def schedule(self, message, schedule_time_utc): """ # pylint: disable=protected-access self._open() - if isinstance(message, BatchMessage): - request_body = self._build_schedule_request(schedule_time_utc, *message._messages) + if isinstance(messages, Message): + request_body = self._build_schedule_request(schedule_time_utc, messages) else: - request_body = self._build_schedule_request(schedule_time_utc, message) + request_body = self._build_schedule_request(schedule_time_utc, *messages) return self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, @@ -220,12 +220,12 @@ def schedule(self, message, schedule_time_utc): ) def cancel_scheduled_messages(self, sequence_numbers): - # type: (Union[int, List[int]]) -> None + # type: (Union[int, Iterator[int]]) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. :param sequence_numbers: The sequence numbers of the scheduled messages. - :type sequence_numbers: int or list[int] + :type sequence_numbers: int or Iterator[int] :rtype: None .. admonition:: Example: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 5febe1839c8e..0bb61ba13b41 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- import logging import asyncio -from typing import Any, TYPE_CHECKING, Union, List +from typing import Any, TYPE_CHECKING, Union, List, Iterable import uamqp from uamqp import SendClientAsync, types @@ -139,15 +139,15 @@ async def _send(self, message, timeout=None, last_exception=None): except Exception as e: raise MessageSendFailed(e) - async def schedule(self, message, schedule_time_utc): - # type: (Union[Message, BatchMessage], datetime.datetime) -> List[int] - """Send Message or BatchMessage to be enqueued at a specific time. + async def schedule(self, messages, schedule_time_utc): + # type: (Union[Message, Iterable[Message]], datetime.datetime) -> List[int] + """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. - :param message: The messages to schedule. - :type message: ~azure.servicebus.Message or ~azure.servicebus.BatchMessage + :param messages: The message or list of messages to schedule. + :type messages: ~azure.servicebus.Message or Iterator[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime - :rtype: List[int] + :rtype: list[int] .. admonition:: Example: @@ -160,10 +160,10 @@ async def schedule(self, message, schedule_time_utc): """ # pylint: disable=protected-access await self._open() - if isinstance(message, BatchMessage): - request_body = self._build_schedule_request(schedule_time_utc, *message._messages) + if isinstance(messages, Message): + request_body = self._build_schedule_request(schedule_time_utc, messages) else: - request_body = self._build_schedule_request(schedule_time_utc, message) + request_body = self._build_schedule_request(schedule_time_utc, *messages) return await self._mgmt_request_response_with_retry( REQUEST_RESPONSE_SCHEDULE_MESSAGE_OPERATION, request_body, @@ -171,12 +171,12 @@ async def schedule(self, message, schedule_time_utc): ) async def cancel_scheduled_messages(self, sequence_numbers): - # type: (Union[int, List[int]]) -> None + # type: (Union[int, Iterator[int]]) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. :param sequence_numbers: he sequence numbers of the scheduled messages. - :type sequence_numbers: int or list[int] + :type sequence_numbers: int or Iterator[int] :rtype: None .. admonition:: Example: diff --git a/sdk/servicebus/azure-servicebus/samples/README.md b/sdk/servicebus/azure-servicebus/samples/README.md index 10546948e2a7..fc2453a9e592 100644 --- a/sdk/servicebus/azure-servicebus/samples/README.md +++ b/sdk/servicebus/azure-servicebus/samples/README.md @@ -33,6 +33,9 @@ Both [sync version](./sync_samples) and [async version](./async_samples) of samp - [session_send_receive.py](./sync_samples/session_send_receive.py) ([async_version](./async_samples/session_send_receive_async.py)) - Examples to send messages to and receive messages from a session-enabled service bus queue: - Send messages to a session-enabled queue - Receive messages from session-enabled queue +- [schedule_messages_and_cancellation](./sync_samples/schedule_messages_and_cancellation.py) ([async_version](./async_samples/schedule_messages_and_cancellation_async.py)) - Examples to schedule messages and cancel scheduled message: + - Schedule a single message or multiples messages to a queue + - Cancel scheduled messages from a queue - [client_identity_authentication.py](./sync_samples/client_identity_authentication.py) ([async_version](./async_samples/client_identity_authentication_async.py)) - Examples to authenticate the client by Azure Activate Directory - Authenticate and create the client utilizing the `azure.identity` library - [proxy.py](./sync_samples/proxy.py) ([async_version](./async_samples/proxy_async.py)) - Examples to send message behind a proxy: diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py index 90657b76e374..105c9920dac3 100644 --- a/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/sample_code_servicebus_async.py @@ -12,6 +12,7 @@ - Receive and settle deferred messages """ import os +import datetime import asyncio from azure.servicebus.aio import ServiceBusClient from azure.servicebus import Message @@ -96,7 +97,7 @@ async def example_create_servicebus_sender_async(): topic_name = os.environ['SERVICE_BUS_TOPIC_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) async with servicebus_client: - queue_sender = servicebus_client.get_topic_sender(topic_name=topic_name) + topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name) # [END create_topic_sender_from_sb_client_async] return queue_sender @@ -271,8 +272,24 @@ async def example_session_ops_async(): break +async def example_schedule_ops_async(): + servicebus_sender = await example_create_servicebus_sender_async() + # [START scheduling_messages_async] + async with servicebus_sender: + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + scheduled_messages = [Message("Scheduled message") for _ in range(10)] + sequence_nums = await servicebus_sender.schedule(scheduled_messages, scheduled_time_utc) + # [END scheduling_messages_async] + + # [START cancel_scheduled_messages_async] + async with servicebus_sender: + await servicebus_sender.cancel_scheduled_messages(sequence_nums) + # [END cancel_scheduled_messages_async] + + if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(example_send_and_receive_async()) loop.run_until_complete(example_receive_deferred_async()) + loop.run_until_complete(example_schedule_ops_async()) # loop.run_until_complete(example_session_ops_async()) diff --git a/sdk/servicebus/azure-servicebus/samples/async_samples/schedule_messages_and_cancellation_async.py b/sdk/servicebus/azure-servicebus/samples/async_samples/schedule_messages_and_cancellation_async.py new file mode 100644 index 000000000000..05458e1169dc --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/async_samples/schedule_messages_and_cancellation_async.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Example to show scheduling messages to and cancelling messages from a Service Bus Queue asynchronously. +""" + +# pylint: disable=C0111 + +import os +import asyncio +import datetime +from azure.servicebus.aio import ServiceBusClient +from azure.servicebus import Message + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + + +async def schedule_single_message(sender): + message = Message("Message to be scheduled") + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + sequence_number = await sender.schedule(message, scheduled_time_utc) + return sequence_number + + +async def schedule_multiple_messages(sender): + messages_to_schedule = [] + for _ in range(10): + messages_to_schedule.append(Message("Message to be scheduled")) + + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + sequence_numbers = await sender.schedule(messages_to_schedule, scheduled_time_utc) + return sequence_numbers + + +async def main(): + servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True) + async with servicebus_client: + sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) + async with sender: + sequence_number = await schedule_single_message(sender) + print("Single message is scheduled and sequence number is {}".format(sequence_number)) + sequence_numbers = await schedule_multiple_messages(sender) + print("Multiple messages are scheduled and sequence numbers are {}".format(sequence_numbers)) + + await sender.cancel_scheduled_messages(sequence_number) + await sender.cancel_scheduled_messages(sequence_numbers) + print("All scheduled messages are cancelled.") + +loop = asyncio.get_event_loop() +loop.run_until_complete(main()) diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py index 55d689e4bf5a..9155886e31b0 100644 --- a/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/sample_code_servicebus.py @@ -93,7 +93,7 @@ def example_create_servicebus_sender_sync(): topic_name = os.environ['SERVICE_BUS_TOPIC_NAME'] servicebus_client = ServiceBusClient.from_connection_string(conn_str=servicebus_connection_str) with servicebus_client: - queue_sender = servicebus_client.get_topic_sender(topic_name=topic_name) + topic_sender = servicebus_client.get_topic_sender(topic_name=topic_name) # [END create_topic_sender_from_sb_client_sync] return queue_sender @@ -295,6 +295,22 @@ def example_session_ops_sync(): break +def example_schedule_ops_sync(): + servicebus_sender = example_create_servicebus_sender_sync() + # [START scheduling_messages] + with servicebus_sender: + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + scheduled_messages = [Message("Scheduled message") for _ in range(10)] + sequence_nums = servicebus_sender.schedule(scheduled_messages, scheduled_time_utc) + # [END scheduling_messages] + + # [START cancel_scheduled_messages] + with servicebus_sender: + servicebus_sender.cancel_scheduled_messages(sequence_nums) + # [END cancel_scheduled_messages] + + example_send_and_receive_sync() example_receive_deferred_sync() +example_schedule_ops_sync() # example_session_ops_sync() diff --git a/sdk/servicebus/azure-servicebus/samples/sync_samples/schedule_messages_and_cancellation.py b/sdk/servicebus/azure-servicebus/samples/sync_samples/schedule_messages_and_cancellation.py new file mode 100644 index 000000000000..8d43f3c42deb --- /dev/null +++ b/sdk/servicebus/azure-servicebus/samples/sync_samples/schedule_messages_and_cancellation.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python + +# -------------------------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for license information. +# -------------------------------------------------------------------------------------------- + +""" +Example to show scheduling messages to and cancelling messages from a Service Bus Queue. +""" + +# pylint: disable=C0111 + +import os +import datetime +from azure.servicebus import ServiceBusClient, Message + +CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR'] +QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"] + + +def schedule_single_message(sender): + message = Message("Message to be scheduled") + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + sequence_number = sender.schedule(message, scheduled_time_utc) + return sequence_number + + +def schedule_multiple_messages(sender): + messages_to_schedule = [] + for _ in range(10): + messages_to_schedule.append(Message("Message to be scheduled")) + + scheduled_time_utc = datetime.datetime.utcnow() + datetime.timedelta(seconds=30) + sequence_numbers = sender.schedule(messages_to_schedule, scheduled_time_utc) + return sequence_numbers + + +servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True) +with servicebus_client: + sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME) + with sender: + sequence_number = schedule_single_message(sender) + print("Single message is scheduled and sequence number is {}".format(sequence_number)) + sequence_numbers = schedule_multiple_messages(sender) + print("Multiple messages are scheduled and sequence numbers are {}".format(sequence_numbers)) + + sender.cancel_scheduled_messages(sequence_number) + sender.cancel_scheduled_messages(sequence_numbers) + print("All scheduled messages are cancelled.") diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index df28865e22c4..6cd32b8fe36c 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -936,7 +936,6 @@ async def test_async_queue_message_batch(self, servicebus_namespace_connection_s print_message(_logger, m) await m.complete() - @pytest.mark.skip(reason="requires scheduler") @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -971,7 +970,6 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio else: raise Exception("Failed to receive scheduled message.") - @pytest.mark.skip(reason="requires scheduler") @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -991,7 +989,7 @@ async def test_async_queue_schedule_multiple_messages(self, servicebus_namespace message_id_b = uuid.uuid4() message_b = Message(content) message_b.properties.message_id = message_id_b - tokens = await sender.schedule(enqueue_time, message_a, message_b) + tokens = await sender.schedule([message_a, message_b], enqueue_time) assert len(tokens) == 2 recv = await receiver.receive(max_wait_time=120) @@ -1027,10 +1025,10 @@ async def test_async_queue_cancel_scheduled_messages(self, servicebus_namespace_ async with sb_client.get_queue_sender(servicebus_queue.name) as sender: message_a = Message("Test scheduled message") message_b = Message("Test scheduled message") - tokens = await sender.schedule(enqueue_time, message_a, message_b) + tokens = await sender.schedule([message_a, message_b], enqueue_time) assert len(tokens) == 2 - await sender.cancel_scheduled_messages(*tokens) + await sender.cancel_scheduled_messages(tokens) messages = await receiver.receive(max_wait_time=120) assert len(messages) == 0 diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index 125c2d662b88..f2a3664491e3 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1088,7 +1088,6 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se raise Exception("Failed to receive schdeduled message.") - @pytest.mark.skip("Pending message scheduling functionality") @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -1110,11 +1109,11 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ message_id_b = uuid.uuid4() message_b = Message(content) message_b.properties.message_id = message_id_b - tokens = sender.schedule(enqueue_time, message_a, message_b) + tokens = sender.schedule([message_a, message_b], enqueue_time) assert len(tokens) == 2 - messages = receiver.fetch_next(timeout=120) - messages.extend(receiver.fetch_next(timeout=5)) + messages = receiver.receive(max_wait_time=120) + messages.extend(receiver.receive(max_wait_time=5)) if messages: try: data = str(messages[0]) @@ -1130,7 +1129,6 @@ def test_queue_schedule_multiple_messages(self, servicebus_namespace_connection_ raise Exception("Failed to receive schdeduled message.") - @pytest.mark.skip(reason="Pending message scheduling functionality") @pytest.mark.liveTest @pytest.mark.live_test_only @CachedResourceGroupPreparer(name_prefix='servicebustest') @@ -1146,10 +1144,10 @@ def test_queue_cancel_scheduled_messages(self, servicebus_namespace_connection_s with sb_client.get_queue_sender(servicebus_queue.name) as sender: message_a = Message("Test scheduled message") message_b = Message("Test scheduled message") - tokens = sender.schedule(enqueue_time, message_a, message_b) + tokens = sender.schedule([message_a, message_b], enqueue_time) assert len(tokens) == 2 - sender.cancel_scheduled_messages(*tokens) + sender.cancel_scheduled_messages(tokens) messages = receiver.receive(max_wait_time=120) try: From 030b9cf4fb5cff9023adbe0750690ca9c3cb973d Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 28 Apr 2020 12:00:22 -0700 Subject: [PATCH 4/8] remove iterable type hint and docstring --- .../azure/servicebus/_servicebus_sender.py | 12 +++++++----- .../servicebus/aio/_servicebus_sender_async.py | 14 ++++++++------ 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 08bac4ba08a5..2d869164ba94 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -5,7 +5,7 @@ import logging import time import uuid -from typing import Any, TYPE_CHECKING, Union, List, Iterable +from typing import Any, TYPE_CHECKING, Union, List import uamqp from uamqp import SendClient, types @@ -189,11 +189,11 @@ def _send(self, message, timeout=None, last_exception=None): raise MessageSendFailed(e) def schedule(self, messages, schedule_time_utc): - # type: (Union[Message, Iterable[Message]], datetime.datetime) -> List[int] + # type: (Union[Message, List[Message]], datetime.datetime) -> List[int] """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. - :type messages: ~azure.servicebus.Message or Iterator[~azure.servicebus.Message] + :type messages: ~azure.servicebus.Message or List[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime :rtype: list[int] @@ -220,13 +220,15 @@ def schedule(self, messages, schedule_time_utc): ) def cancel_scheduled_messages(self, sequence_numbers): - # type: (Union[int, Iterator[int]]) -> None + # type: (Union[int, List[int]]) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. :param sequence_numbers: The sequence numbers of the scheduled messages. - :type sequence_numbers: int or Iterator[int] + :type sequence_numbers: int or List[int] :rtype: None + :raises: ~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already + cancelled or enqueued. .. admonition:: Example: diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 0bb61ba13b41..9ff6793df1d0 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -4,7 +4,7 @@ # -------------------------------------------------------------------------------------------- import logging import asyncio -from typing import Any, TYPE_CHECKING, Union, List, Iterable +from typing import Any, TYPE_CHECKING, Union, List import uamqp from uamqp import SendClientAsync, types @@ -140,11 +140,11 @@ async def _send(self, message, timeout=None, last_exception=None): raise MessageSendFailed(e) async def schedule(self, messages, schedule_time_utc): - # type: (Union[Message, Iterable[Message]], datetime.datetime) -> List[int] + # type: (Union[Message, List[Message]], datetime.datetime) -> List[int] """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. - :type messages: ~azure.servicebus.Message or Iterator[~azure.servicebus.Message] + :type messages: ~azure.servicebus.Message or List[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime :rtype: list[int] @@ -171,13 +171,15 @@ async def schedule(self, messages, schedule_time_utc): ) async def cancel_scheduled_messages(self, sequence_numbers): - # type: (Union[int, Iterator[int]]) -> None + # type: (Union[int, List[int]]) -> None """ Cancel one or more messages that have previously been scheduled and are still pending. - :param sequence_numbers: he sequence numbers of the scheduled messages. - :type sequence_numbers: int or Iterator[int] + :param sequence_numbers: The sequence numbers of the scheduled messages. + :type sequence_numbers: int or List[int] :rtype: None + :raises: ~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already + cancelled or enqueued. .. admonition:: Example: From 9ccd1ef2660d5e16f08726cc15863416d4c96c12 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 28 Apr 2020 23:57:10 -0700 Subject: [PATCH 5/8] update implementation --- .../azure-servicebus/azure/servicebus/_common/message.py | 4 ++-- .../azure/servicebus/_servicebus_sender.py | 7 +++++-- .../azure/servicebus/aio/_servicebus_receiver_async.py | 2 +- .../azure/servicebus/aio/_servicebus_sender_async.py | 6 +++--- .../tests/async_tests/test_queues_async.py | 2 +- .../tests/async_tests/test_sessions_async.py | 2 +- sdk/servicebus/azure-servicebus/tests/test_queues.py | 2 +- 7 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 57eafe94f426..55779f3889a1 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -244,8 +244,8 @@ def scheduled_enqueue_time_utc(self): # type: () -> Optional[datetime.datetime] """Get or set the utc scheduled enqueue time to the message. This property can be used for scheduling when sending a message through `ServiceBusSender.send` method. - Or you could use `ServiceBusSender.schedule` method to schedule messages which would return scheduled messages' - sequence numbers that can be used for future cancellation. + If cancelling scheduled messages is required, you should use the `ServiceBusSender.schedule` method, + which returns sequence numbers that can be used for future cancellation. :rtype: ~datetime.datetime """ diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py index 2d869164ba94..0e682feda86d 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py @@ -65,6 +65,9 @@ def _set_msg_timeout(self, timeout=None, last_exception=None): def _build_schedule_request(cls, schedule_time_utc, *messages): request_body = {MGMT_REQUEST_MESSAGES: []} for message in messages: + if not isinstance(message, Message): + raise ValueError("Scheduling batch messages only supports iterables containing Message Objects." + " Received instead: {}".format(message.__class__.__name__)) message.scheduled_enqueue_time_utc = schedule_time_utc message_data = {} message_data[MGMT_REQUEST_MESSAGE_ID] = message.properties.message_id @@ -193,7 +196,7 @@ def schedule(self, messages, schedule_time_utc): """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. - :type messages: ~azure.servicebus.Message or List[~azure.servicebus.Message] + :type messages: ~azure.servicebus.Message or list[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime :rtype: list[int] @@ -225,7 +228,7 @@ def cancel_scheduled_messages(self, sequence_numbers): Cancel one or more messages that have previously been scheduled and are still pending. :param sequence_numbers: The sequence numbers of the scheduled messages. - :type sequence_numbers: int or List[int] + :type sequence_numbers: int or list[int] :rtype: None :raises: ~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already cancelled or enqueued. diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py index 7ac67fefc4c3..c88167f10176 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_receiver_async.py @@ -170,7 +170,7 @@ async def _open(self): await asyncio.sleep(0.05) self._running = True except: - self.close() + await self.close() raise async def _receive(self, max_batch_size=None, timeout=None): diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py index 9ff6793df1d0..2d268cba1dc1 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py @@ -128,7 +128,7 @@ async def _open(self): self._max_message_size_on_link = self._handler.message_handler._link.peer_max_message_size \ or uamqp.constants.MAX_MESSAGE_LENGTH_BYTES except: - self.close() + await self.close() raise async def _send(self, message, timeout=None, last_exception=None): @@ -144,7 +144,7 @@ async def schedule(self, messages, schedule_time_utc): """Send Message or multiple Messages to be enqueued at a specific time. Returns a list of the sequence numbers of the enqueued messages. :param messages: The message or list of messages to schedule. - :type messages: ~azure.servicebus.Message or List[~azure.servicebus.Message] + :type messages: ~azure.servicebus.Message or list[~azure.servicebus.Message] :param schedule_time_utc: The utc date and time to enqueue the messages. :type schedule_time_utc: ~datetime.datetime :rtype: list[int] @@ -176,7 +176,7 @@ async def cancel_scheduled_messages(self, sequence_numbers): Cancel one or more messages that have previously been scheduled and are still pending. :param sequence_numbers: The sequence numbers of the scheduled messages. - :type sequence_numbers: int or List[int] + :type sequence_numbers: int or list[int] :rtype: None :raises: ~azure.servicebus.exceptions.ServiceBusError if messages cancellation failed due to message already cancelled or enqueued. diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py index 6cd32b8fe36c..4f41d02b7c67 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py @@ -952,7 +952,7 @@ async def test_async_queue_schedule_message(self, servicebus_namespace_connectio message_id = uuid.uuid4() message = Message(content) message.properties.message_id = message_id - message.schedule(enqueue_time) + message.scheduled_enqueue_time_utc = enqueue_time await sender.send(message) messages = await receiver.receive(max_wait_time=120) diff --git a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py index 606c9840e082..24e0c4063583 100644 --- a/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py +++ b/sdk/servicebus/azure-servicebus/tests/async_tests/test_sessions_async.py @@ -597,7 +597,7 @@ async def test_async_session_schedule_message(self, servicebus_namespace_connect message_id = uuid.uuid4() message = Message(content, session_id=session_id) message.properties.message_id = message_id - message.schedule(enqueue_time) + message.scheduled_enqueue_time_utc = enqueue_time await sender.send(message) messages = [] diff --git a/sdk/servicebus/azure-servicebus/tests/test_queues.py b/sdk/servicebus/azure-servicebus/tests/test_queues.py index f2a3664491e3..875f6a847550 100644 --- a/sdk/servicebus/azure-servicebus/tests/test_queues.py +++ b/sdk/servicebus/azure-servicebus/tests/test_queues.py @@ -1069,7 +1069,7 @@ def test_queue_schedule_message(self, servicebus_namespace_connection_string, se message_id = uuid.uuid4() message = Message(content) message.properties.message_id = message_id - message.schedule(enqueue_time) + message.scheduled_enqueue_time_utc = enqueue_time sender.send(message) messages = receiver.receive(max_wait_time=120) From e15197a257706732d8f645110bfe4e6146cfa1fd Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Wed, 29 Apr 2020 12:19:07 -0700 Subject: [PATCH 6/8] update comment --- .../azure-servicebus/azure/servicebus/_common/message.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py index 55779f3889a1..b5ccae84ac08 100644 --- a/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py +++ b/sdk/servicebus/azure-servicebus/azure/servicebus/_common/message.py @@ -246,6 +246,7 @@ def scheduled_enqueue_time_utc(self): This property can be used for scheduling when sending a message through `ServiceBusSender.send` method. If cancelling scheduled messages is required, you should use the `ServiceBusSender.schedule` method, which returns sequence numbers that can be used for future cancellation. + `scheduled_enqueue_time_utc` is None if not set. :rtype: ~datetime.datetime """ From 87186accd6b3db6866cee35ed09a82f4d9c208a9 Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Thu, 30 Apr 2020 14:16:01 -0700 Subject: [PATCH 7/8] update docs --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 5 +++-- sdk/servicebus/azure-servicebus/migration_guide.md | 7 +++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 44a88abadaa3..65283dbdd054 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -7,8 +7,8 @@ * Added method `get_topic_sender` in `ServiceBusClient` to get a `ServiceBusSender` for a topic. * Added method `get_subscription_receiver` in `ServiceBusClient` to get a `ServiceBusReceiver` for a subscription under specific topic. * Added support for scheduling messages and scheduled message cancellation. - - Use `ServiceBusSender.schedule` for scheduling messages. - - Use `ServiceBusSender.cancel_scheduled_messages` for scheduled messages cancellation. + - Use `ServiceBusSender.schedule(messages, schedule_time_utc)` for scheduling messages. + - Use `ServiceBusSender.cancel_scheduled_messages(sequence_numbers)` for scheduled messages cancellation. * `ServiceBusSender.send()` can now send a list of messages in one call, if they fit into a single batch. If they do not fit a `ValueError` is thrown. * `BatchMessage.add()` and `ServiceBusSender.send()` raises `MessageContentTooLarge`, which is a subclass of `ValueError` if the content is over-sized. * `ServiceBusReceiver.receive()` raises `ValueError` if the max_batch_size is greater than the prefetch of `ServiceBusClient`. @@ -24,6 +24,7 @@ * Session receivers are now created via their own top level functions, e.g. `get_queue_sesison_receiver` and `get_subscription_session_receiver`. Non session receivers no longer take session_id as a paramter. * `ServiceBusSender.send()` no longer takes a timeout parameter, as it should be redundant with retry options provided when creating the client. * Exception imports have been removed from module `azure.servicebus`. Import from `azure.servicebus.exceptions` instead. +* `ServiceBusSender.schedule()` has swapped the ordering of parameters `schedule_time_utc` and `messages`. ## 7.0.0b1 (2020-04-06) diff --git a/sdk/servicebus/azure-servicebus/migration_guide.md b/sdk/servicebus/azure-servicebus/migration_guide.md index 7d74fbb615e2..df54ffe743cb 100644 --- a/sdk/servicebus/azure-servicebus/migration_guide.md +++ b/sdk/servicebus/azure-servicebus/migration_guide.md @@ -49,6 +49,13 @@ semantics with the sender or receiver lifetime. | `QueueClient.from_connection_string().send() and ServiceBusClient.from_connection_string().get_queue().get_sender().send()`| `ServiceBusClient.from_connection_string().get_queue_sender().send()`| [Get a sender and send a message](./samples/sync_samples/send_queue.py) | | `queue_client.send(BatchMessage(["data 1", "data 2", ...]))`| `batch = queue_sender.create_batch() batch.add(Message("data 1")) queue_sender.send(batch)`| [Create and send a batch of messages](./samples/sync_samples/send_queue.py) | +### Scheduling messages and cancelling scheduled messages + +| In v0.50 | Equivalent in v7 | Sample | +|---|---|---| +| `queue_client.get_sender().schedule(schedule_time_utc, message1, message2)` | `sb_client.get_queue_sender().schedule([message1, message2], schedule_time_utc)` | [Schedule messages](./samples/sync_samples/schedule_messages_and_cancellation.py) | +| `queue_client.get_sender().cancel_scheduled_messages(sequence_number1, sequence_number2)`| `sb_client.get_queue_sender().cancel_scheduled_messages([sequence_number1, sequence_number2])` | [Cancel scheduled messages](./samples/sync_samples/schedule_messages_and_cancellation.py)| + ### Working with sessions | In v0.50 | Equivalent in v7 | Sample | From a932338f9153d5bdd21ceac2175251cdd5de443f Mon Sep 17 00:00:00 2001 From: "Adam Ling (MSFT)" <47871814+yunhaoling@users.noreply.github.com> Date: Thu, 30 Apr 2020 14:33:25 -0700 Subject: [PATCH 8/8] Update sdk/servicebus/azure-servicebus/CHANGELOG.md Co-authored-by: KieranBrantnerMagee --- sdk/servicebus/azure-servicebus/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-servicebus/CHANGELOG.md b/sdk/servicebus/azure-servicebus/CHANGELOG.md index 65283dbdd054..473f94715dcc 100644 --- a/sdk/servicebus/azure-servicebus/CHANGELOG.md +++ b/sdk/servicebus/azure-servicebus/CHANGELOG.md @@ -24,7 +24,7 @@ * Session receivers are now created via their own top level functions, e.g. `get_queue_sesison_receiver` and `get_subscription_session_receiver`. Non session receivers no longer take session_id as a paramter. * `ServiceBusSender.send()` no longer takes a timeout parameter, as it should be redundant with retry options provided when creating the client. * Exception imports have been removed from module `azure.servicebus`. Import from `azure.servicebus.exceptions` instead. -* `ServiceBusSender.schedule()` has swapped the ordering of parameters `schedule_time_utc` and `messages`. +* `ServiceBusSender.schedule()` has swapped the ordering of parameters `schedule_time_utc` and `messages` for better consistency with `send()` syntax. ## 7.0.0b1 (2020-04-06)