diff --git a/libraries/bot-azure/pom.xml b/libraries/bot-azure/pom.xml index 12c503b9d..54954bae1 100644 --- a/libraries/bot-azure/pom.xml +++ b/libraries/bot-azure/pom.xml @@ -73,6 +73,12 @@ bot-dialogs + + com.azure + azure-storage-queue + 12.8.0 + + com.microsoft.bot bot-builder diff --git a/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/queues/AzureQueueStorage.java b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/queues/AzureQueueStorage.java new file mode 100644 index 000000000..b351cc640 --- /dev/null +++ b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/queues/AzureQueueStorage.java @@ -0,0 +1,90 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.bot.azure.queues; + +import com.azure.storage.queue.QueueClient; +import com.azure.storage.queue.QueueClientBuilder; +import com.azure.storage.queue.models.SendMessageResult; +import com.microsoft.bot.builder.QueueStorage; +import com.microsoft.bot.restclient.serializer.JacksonAdapter; +import com.microsoft.bot.schema.Activity; +import org.apache.commons.lang3.StringUtils; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Base64; +import java.util.concurrent.CompletableFuture; + +/** + * Service used to add messages to an Azure.Storage.Queues. + */ +public class AzureQueueStorage extends QueueStorage { + private Boolean createQueueIfNotExists = true; + private final QueueClient queueClient; + + /** + * Initializes a new instance of the {@link AzureQueueStorage} class. + * @param queuesStorageConnectionString Azure Storage connection string. + * @param queueName Name of the storage queue where entities will be queued. + */ + public AzureQueueStorage(String queuesStorageConnectionString, String queueName) { + if (StringUtils.isBlank(queuesStorageConnectionString)) { + throw new IllegalArgumentException("queuesStorageConnectionString is required."); + } + + if (StringUtils.isBlank(queueName)) { + throw new IllegalArgumentException("queueName is required."); + } + + queueClient = new QueueClientBuilder() + .connectionString(queuesStorageConnectionString) + .queueName(queueName) + .buildClient(); + } + + /** + * Queue an Activity to an Azure.Storage.Queues.QueueClient. + * The visibility timeout specifies how long the message should be invisible + * to Dequeue and Peek operations. The message content must be a UTF-8 encoded string that is up to 64KB in size. + * @param activity This is expected to be an {@link Activity} retrieved from a call to + * activity.GetConversationReference().GetContinuationActivity(). + * This enables restarting the conversation using BotAdapter.ContinueConversationAsync. + * @param visibilityTimeout Default value of 0. Cannot be larger than 7 days. + * @param timeToLive Specifies the time-to-live interval for the message. + * @return {@link SendMessageResult} as a Json string, from the QueueClient SendMessageAsync operation. + */ + @Override + public CompletableFuture queueActivity(Activity activity, + @Nullable Duration visibilityTimeout, + @Nullable Duration timeToLive) { + return CompletableFuture.supplyAsync(() -> { + if (createQueueIfNotExists) { + try { + queueClient.create(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // This is an optimization flag to check if the container creation call has been made. + // It is okay if this is called more than once. + createQueueIfNotExists = false; + } + + try { + JacksonAdapter jacksonAdapter = new JacksonAdapter(); + String serializedActivity = jacksonAdapter.serialize(activity); + byte[] encodedBytes = serializedActivity.getBytes(StandardCharsets.UTF_8); + String encodedString = Base64.getEncoder().encodeToString(encodedBytes); + + SendMessageResult receipt = queueClient.sendMessage(encodedString); + return jacksonAdapter.serialize(receipt); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + }); + } +} diff --git a/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/queues/package-info.java b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/queues/package-info.java new file mode 100644 index 000000000..07134724a --- /dev/null +++ b/libraries/bot-azure/src/main/java/com/microsoft/bot/azure/queues/package-info.java @@ -0,0 +1,8 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for +// license information. + +/** + * This package contains the classes for bot-integration-core. + */ +package com.microsoft.bot.azure.queues; diff --git a/libraries/bot-azure/src/test/java/com/microsoft/bot/azure/AzureQueueTests.java b/libraries/bot-azure/src/test/java/com/microsoft/bot/azure/AzureQueueTests.java new file mode 100644 index 000000000..fa03fa5e3 --- /dev/null +++ b/libraries/bot-azure/src/test/java/com/microsoft/bot/azure/AzureQueueTests.java @@ -0,0 +1,246 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.bot.azure; + +import com.azure.storage.queue.QueueClient; +import com.azure.storage.queue.QueueClientBuilder; +import com.azure.storage.queue.models.QueueMessageItem; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.microsoft.bot.azure.queues.AzureQueueStorage; +import com.microsoft.bot.builder.ConversationState; +import com.microsoft.bot.builder.MemoryStorage; +import com.microsoft.bot.builder.QueueStorage; +import com.microsoft.bot.builder.UserState; +import com.microsoft.bot.builder.adapters.TestAdapter; +import com.microsoft.bot.builder.adapters.TestFlow; +import com.microsoft.bot.dialogs.Dialog; +import com.microsoft.bot.dialogs.DialogContext; +import com.microsoft.bot.dialogs.DialogManager; +import com.microsoft.bot.dialogs.DialogTurnResult; +import com.microsoft.bot.schema.Activity; +import com.microsoft.bot.schema.ActivityEventNames; +import com.microsoft.bot.schema.ActivityTypes; +import com.microsoft.bot.schema.ConversationReference; +import org.apache.commons.codec.binary.Base64; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeParseException; +import java.util.Calendar; +import java.util.concurrent.CompletableFuture; + +import com.microsoft.bot.restclient.serializer.JacksonAdapter; + +public class AzureQueueTests { + private static final Integer DEFAULT_DELAY = 2000; + private static boolean emulatorIsRunning = false; + private final String connectionString = "UseDevelopmentStorage=true"; + private static final String NO_EMULATOR_MESSAGE = "This test requires Azure STORAGE Emulator! Go to https://docs.microsoft.com/azure/storage/common/storage-use-emulator to download and install."; + + @BeforeClass + public static void allTestsInit() throws IOException, InterruptedException { + Process p = Runtime.getRuntime().exec + ("cmd /C \"" + System.getenv("ProgramFiles") + " (x86)\\Microsoft SDKs\\Azure\\Storage Emulator\\AzureStorageEmulator.exe\" start"); + int result = p.waitFor(); + // status = 0: the service was started. + // status = -5: the service is already started. Only one instance of the application + // can be run at the same time. + emulatorIsRunning = result == 0 || result == -5; + } + + // These tests require Azure Storage Emulator v5.7 + public QueueClient containerInit(String name) { + QueueClient queue = new QueueClientBuilder() + .connectionString(connectionString) + .queueName(name) + .buildClient(); + queue.create(); + queue.clearMessages(); + return queue; + } + + @Test + public void continueConversationLaterTests() { + if (runIfEmulator()) { + String queueName = "continueconversationlatertests"; + QueueClient queue = containerInit(queueName); + + ConversationReference cr = TestAdapter.createConversationReference("ContinueConversationLaterTests", "User1", "Bot"); + TestAdapter adapter = new TestAdapter(cr) + .useStorage(new MemoryStorage()) + .useBotState(new ConversationState(new MemoryStorage()), new UserState(new MemoryStorage())); + + AzureQueueStorage queueStorage = new AzureQueueStorage(connectionString, queueName); + + Calendar cal = Calendar.getInstance(); + cal.add(Calendar.SECOND, 2); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss"); + + ContinueConversationLater ccl = new ContinueConversationLater(); + ccl.setDate(sdf.format(cal.getTime())); + ccl.setValue("foo"); + DialogManager dm = new DialogManager(ccl, "DialogStateProperty"); + dm.getInitialTurnState().replace("QueueStorage", queueStorage); + + new TestFlow(adapter, turnContext -> CompletableFuture.runAsync(() -> dm.onTurn(turnContext))) + .send("hi") + .startTest().join(); + + try { + Thread.sleep(DEFAULT_DELAY); + } catch (InterruptedException e) { + e.printStackTrace(); + Assert.fail(); + } + + QueueMessageItem messages = queue.receiveMessage(); + JacksonAdapter jacksonAdapter = new JacksonAdapter(); + String messageJson = new String(Base64.decodeBase64(messages.getMessageText())); + Activity activity = null; + + try { + activity = jacksonAdapter.deserialize(messageJson, Activity.class); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + + Assert.assertTrue(activity.isType(ActivityTypes.EVENT)); + Assert.assertEquals(ActivityEventNames.CONTINUE_CONVERSATION, activity.getName()); + Assert.assertEquals("foo", activity.getValue()); + Assert.assertNotNull(activity.getRelatesTo()); + ConversationReference cr2 = activity.getConversationReference(); + cr.setActivityId(null); + cr2.setActivityId(null); + + try { + Assert.assertEquals(jacksonAdapter.serialize(cr), jacksonAdapter.serialize(cr2)); + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + } + + private boolean runIfEmulator() { + if (!emulatorIsRunning) { + System.out.println(NO_EMULATOR_MESSAGE); + return false; + } + + return true; + } + + private class ContinueConversationLater extends Dialog { + @JsonProperty("disabled") + private Boolean disabled = false; + + @JsonProperty("date") + private String date; + + @JsonProperty("value") + private String value; + + /** + * Initializes a new instance of the Dialog class. + */ + public ContinueConversationLater() { + super(ContinueConversationLater.class.getName()); + } + + @Override + public CompletableFuture beginDialog(DialogContext dc, Object options) { + if (this.disabled) { + return dc.endDialog(); + } + + String dateString = this.date; + LocalDateTime date = null; + try { + date = LocalDateTime.parse(dateString); + } catch (DateTimeParseException ex) { + throw new IllegalArgumentException("Date is invalid"); + } + + ZonedDateTime zonedDate = date.atZone(ZoneOffset.UTC); + ZonedDateTime now = LocalDateTime.now().atZone(ZoneOffset.UTC); + if (zonedDate.isBefore(now)) { + throw new IllegalArgumentException("Date must be in the future"); + } + + // create ContinuationActivity from the conversation reference. + Activity activity = dc.getContext().getActivity().getConversationReference().getContinuationActivity(); + activity.setValue(this.value); + + Duration visibility = Duration.between(zonedDate, now); + Duration ttl = visibility.plusMinutes(2); + + QueueStorage queueStorage = dc.getContext().getTurnState().get("QueueStorage"); + if (queueStorage == null) { + throw new NullPointerException("Unable to locate QueueStorage in HostContext"); + } + return queueStorage.queueActivity(activity, visibility, ttl).thenCompose(receipt -> { + // return the receipt as the result + return dc.endDialog(receipt); + }); + } + + /** + * Gets an optional expression which if is true will disable this action. + * "user.age > 18". + * @return A boolean expression. + */ + public Boolean getDisabled() { + return disabled; + } + + /** + * Sets an optional expression which if is true will disable this action. + * "user.age > 18". + * @param withDisabled A boolean expression. + */ + public void setDisabled(Boolean withDisabled) { + this.disabled = withDisabled; + } + + /** + * Gets the expression which resolves to the date/time to continue the conversation. + * @return Date/time string in ISO 8601 format to continue conversation. + */ + public String getDate() { + return date; + } + + /** + * Sets the expression which resolves to the date/time to continue the conversation. + * @param withDate Date/time string in ISO 8601 format to continue conversation. + */ + public void setDate(String withDate) { + this.date = withDate; + } + + /** + * Gets an optional value to use for EventActivity.Value. + * @return The value to use for the EventActivity.Value payload. + */ + public String getValue() { + return value; + } + + /** + * Sets an optional value to use for EventActivity.Value. + * @param withValue The value to use for the EventActivity.Value payload. + */ + public void setValue(String withValue) { + this.value = withValue; + } + } +} diff --git a/libraries/bot-builder/src/main/java/com/microsoft/bot/builder/QueueStorage.java b/libraries/bot-builder/src/main/java/com/microsoft/bot/builder/QueueStorage.java new file mode 100644 index 000000000..0665e8629 --- /dev/null +++ b/libraries/bot-builder/src/main/java/com/microsoft/bot/builder/QueueStorage.java @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.microsoft.bot.builder; + +import com.microsoft.bot.schema.Activity; + +import javax.annotation.Nullable; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +/** + * A base class for enqueueing an Activity for later processing. + */ +public abstract class QueueStorage { + + /** + * Enqueues an Activity for later processing. The visibility timeout specifies how long the message + * should be invisible to Dequeue and Peek operations. + * @param activity The {@link Activity} to be queued for later processing. + * @param visibilityTimeout Visibility timeout. Optional with a default value of 0. Cannot be larger than 7 days. + * @param timeToLive Specifies the time-to-live interval for the message. + * @return A result string. + */ + public abstract CompletableFuture queueActivity(Activity activity, + @Nullable Duration visibilityTimeout, + @Nullable Duration timeToLive); +} diff --git a/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/adapters/TestAdapter.java b/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/adapters/TestAdapter.java index f6f6bb4f8..94674825f 100644 --- a/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/adapters/TestAdapter.java +++ b/libraries/bot-builder/src/test/java/com/microsoft/bot/builder/adapters/TestAdapter.java @@ -347,8 +347,10 @@ public Activity makeActivity() { public Activity makeActivity(String withText) { Integer next = nextId++; + String locale = !getLocale().isEmpty() ? getLocale() : "en-us"; Activity activity = new Activity(ActivityTypes.MESSAGE) { { + setLocale(locale); setFrom(conversationReference().getUser()); setRecipient(conversationReference().getBot()); setConversation(conversationReference().getConversation()); @@ -416,8 +418,8 @@ public static ConversationReference createConversationReference(String name, Str reference.setChannelId("test"); reference.setServiceUrl("https://test.com"); reference.setConversation(new ConversationAccount(false, name, name, null, null, null, null)); - reference.setUser(new ChannelAccount(user.toLowerCase(), user.toLowerCase())); - reference.setBot(new ChannelAccount(bot.toLowerCase(), bot.toLowerCase())); + reference.setUser(new ChannelAccount(user.toLowerCase(), user)); + reference.setBot(new ChannelAccount(bot.toLowerCase(), bot)); reference.setLocale("en-us"); return reference; } diff --git a/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/ConversationReference.java b/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/ConversationReference.java index 4a9f1bd65..bd6d59e3f 100644 --- a/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/ConversationReference.java +++ b/libraries/bot-schema/src/main/java/com/microsoft/bot/schema/ConversationReference.java @@ -79,6 +79,7 @@ public Activity getContinuationActivity() { activity.setConversation(getConversation()); activity.setRecipient(getBot()); activity.setLocale(getLocale()); + activity.setServiceUrl(getServiceUrl()); activity.setFrom(getUser()); activity.setRelatesTo(this); return activity;