diff --git a/clients/aws-sdk-lex-runtime-v2/tests/integration/__init__.py b/clients/aws-sdk-lex-runtime-v2/tests/integration/__init__.py new file mode 100644 index 0000000..0da32eb --- /dev/null +++ b/clients/aws-sdk-lex-runtime-v2/tests/integration/__init__.py @@ -0,0 +1,21 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +from smithy_aws_core.identity import EnvironmentCredentialsResolver + +from aws_sdk_lex_runtime_v2.client import LexRuntimeV2Client +from aws_sdk_lex_runtime_v2.config import Config + +BOT_ALIAS_ID = "TSTALIASID" +LOCALE_ID = "en_US" +REGION = "us-east-1" + + +def create_lex_client(region: str) -> LexRuntimeV2Client: + return LexRuntimeV2Client( + config=Config( + endpoint_uri=f"https://runtime-v2-lex.{region}.amazonaws.com", + region=region, + aws_credentials_identity_resolver=EnvironmentCredentialsResolver(), + ) + ) diff --git a/clients/aws-sdk-lex-runtime-v2/tests/integration/conftest.py b/clients/aws-sdk-lex-runtime-v2/tests/integration/conftest.py new file mode 100644 index 0000000..8a2407c --- /dev/null +++ b/clients/aws-sdk-lex-runtime-v2/tests/integration/conftest.py @@ -0,0 +1,140 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Pytest fixtures for Lex Runtime V2 integration tests. + +Creates and tears down a Lex V2 bot with a Greeting intent once per +test session. All integration tests receive the bot_id via the +``lex_bot`` fixture. +""" + +import json +import uuid + +import boto3 +import pytest + +from . import LOCALE_ID, REGION + +_UNIQUE_SUFFIX = uuid.uuid4().hex +ROLE_NAME = f"LexRuntimeV2IntegTestRole-{_UNIQUE_SUFFIX}" +BOT_NAME = f"smithy-python-integ-test-bot-{_UNIQUE_SUFFIX}" + + +def _create_lex_bot() -> str: + """Create a Lex V2 bot with a Greeting intent. + + Returns: + The bot ID. + """ + iam = boto3.client("iam") + lex = boto3.client("lexv2-models", region_name=REGION) + sts = boto3.client("sts") + + account_id = sts.get_caller_identity()["Account"] + role_arn = f"arn:aws:iam::{account_id}:role/{ROLE_NAME}" + + # Create IAM role for the bot + trust_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "lexv2.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + try: + iam.create_role( + RoleName=ROLE_NAME, AssumeRolePolicyDocument=json.dumps(trust_policy) + ) + except iam.exceptions.EntityAlreadyExistsException: + pass + + # Create bot + response = lex.create_bot( + botName=BOT_NAME, + roleArn=role_arn, + dataPrivacy={"childDirected": False}, + # 5-minute idle timeout is sufficient for integration tests. + idleSessionTTLInSeconds=300, + ) + bot_id = response["botId"] + lex.get_waiter("bot_available").wait(botId=bot_id) + + # Create locale + lex.create_bot_locale( + botId=bot_id, + botVersion="DRAFT", + localeId=LOCALE_ID, + # Required field. Confidence threshold (0-1) that determines when Lex + # inserts AMAZON.FallbackIntent into the interpretations list. + # 0.40 is a reasonable value for a simple test bot. + nluIntentConfidenceThreshold=0.40, + ) + lex.get_waiter("bot_locale_created").wait( + botId=bot_id, botVersion="DRAFT", localeId=LOCALE_ID + ) + + # Create intent + lex.create_intent( + intentName="Greeting", + botId=bot_id, + botVersion="DRAFT", + localeId=LOCALE_ID, + sampleUtterances=[ + {"utterance": "Hello"}, + {"utterance": "Hi"}, + {"utterance": "Hey"}, + ], + intentClosingSetting={ + "closingResponse": { + "messageGroups": [ + { + "message": { + "plainTextMessage": {"value": "Hello! How can I help you?"} + } + } + ] + }, + "active": True, + }, + ) + + # Build locale + lex.build_bot_locale(botId=bot_id, botVersion="DRAFT", localeId=LOCALE_ID) + lex.get_waiter("bot_locale_built").wait( + botId=bot_id, botVersion="DRAFT", localeId=LOCALE_ID + ) + + return bot_id + + +def _delete_lex_bot(bot_id: str | None) -> None: + """Delete a Lex V2 bot and its associated IAM role. + + Args: + bot_id: The bot ID to delete, or None if creation failed. + """ + lex = boto3.client("lexv2-models", region_name=REGION) + iam = boto3.client("iam") + + if bot_id: + lex.delete_bot(botId=bot_id, skipResourceInUseCheck=True) + + try: + iam.delete_role(RoleName=ROLE_NAME) + except iam.exceptions.NoSuchEntityException: + pass + + +@pytest.fixture(scope="session") +def lex_bot(): + """Create a Lex bot for the test session and delete it after.""" + bot_id = None + try: + bot_id = _create_lex_bot() + yield bot_id + finally: + _delete_lex_bot(bot_id) diff --git a/clients/aws-sdk-lex-runtime-v2/tests/integration/test_bidirectional_streaming.py b/clients/aws-sdk-lex-runtime-v2/tests/integration/test_bidirectional_streaming.py new file mode 100644 index 0000000..fc12bea --- /dev/null +++ b/clients/aws-sdk-lex-runtime-v2/tests/integration/test_bidirectional_streaming.py @@ -0,0 +1,146 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Test bidirectional streaming event stream handling.""" + +import asyncio +import uuid + +from smithy_core.aio.eventstream import DuplexEventStream + +from aws_sdk_lex_runtime_v2.models import ( + StartConversationInput, + StartConversationRequestEventStream, + StartConversationRequestEventStreamConfigurationEvent, + StartConversationRequestEventStreamTextInputEvent, + StartConversationRequestEventStreamDisconnectionEvent, + StartConversationResponseEventStream, + StartConversationResponseEventStreamHeartbeatEvent, + StartConversationResponseEventStreamIntentResultEvent, + StartConversationResponseEventStreamTextResponseEvent, + StartConversationResponseEventStreamTranscriptEvent, + StartConversationOutput, + ConfigurationEvent, + TextInputEvent, + DisconnectionEvent, +) +from . import BOT_ALIAS_ID, LOCALE_ID, REGION, create_lex_client + + +async def _send_events( + stream: DuplexEventStream[ + StartConversationRequestEventStream, + StartConversationResponseEventStream, + StartConversationOutput, + ], +) -> None: + """Send configuration, text input, and disconnection events.""" + input_stream = stream.input_stream + + await input_stream.send( + StartConversationRequestEventStreamConfigurationEvent( + value=ConfigurationEvent(response_content_type="text/plain; charset=utf-8") + ) + ) + + await input_stream.send( + StartConversationRequestEventStreamTextInputEvent( + value=TextInputEvent(text="Hello") + ) + ) + + await asyncio.sleep(3) + + await input_stream.send( + StartConversationRequestEventStreamDisconnectionEvent( + value=DisconnectionEvent() + ) + ) + + await input_stream.close() + + +async def _receive_events( + stream: DuplexEventStream[ + StartConversationRequestEventStream, + StartConversationResponseEventStream, + StartConversationOutput, + ], +) -> tuple[bool, bool, bool, bool]: + """Receive and collect output from the stream. + + Returns: + Tuple of (got_transcript, got_intent_result, got_text_response, got_heartbeat) + """ + got_transcript = False + got_intent_result = False + got_text_response = False + got_heartbeat = False + + _, output_stream = await stream.await_output() + if output_stream is None: + return got_transcript, got_intent_result, got_text_response, got_heartbeat + + async for event in output_stream: + if isinstance(event, StartConversationResponseEventStreamTranscriptEvent): + got_transcript = True + assert event.value.event_id is not None + assert event.value.transcript == "Hello" + elif isinstance(event, StartConversationResponseEventStreamIntentResultEvent): + got_intent_result = True + assert event.value.event_id is not None + assert event.value.input_mode == "Text" + assert event.value.session_id is not None + assert event.value.session_state is not None + assert event.value.session_state.intent is not None + assert event.value.session_state.intent.name == "Greeting" + assert event.value.session_state.intent.state == "Fulfilled" + assert event.value.interpretations is not None + assert len(event.value.interpretations) == 2 + interps_by_name = { + i.intent.name: i for i in event.value.interpretations if i.intent + } + assert "Greeting" in interps_by_name + assert "FallbackIntent" in interps_by_name + assert interps_by_name["Greeting"].nlu_confidence is not None + assert interps_by_name["Greeting"].nlu_confidence.score == 1.0 + elif isinstance(event, StartConversationResponseEventStreamTextResponseEvent): + got_text_response = True + assert event.value.event_id is not None + assert event.value.messages is not None + assert len(event.value.messages) == 1 + msg = event.value.messages[0] + assert msg.content_type == "PlainText" + assert msg.content == "Hello! How can I help you?" + elif isinstance(event, StartConversationResponseEventStreamHeartbeatEvent): + got_heartbeat = True + assert event.value.event_id is not None + else: + raise RuntimeError( + f"Received unexpected event type in stream: {type(event).__name__}" + ) + + return got_transcript, got_intent_result, got_text_response, got_heartbeat + + +async def test_start_conversation(lex_bot: str) -> None: + """Test bidirectional streaming StartConversation operation.""" + client = create_lex_client(REGION) + + stream = await client.start_conversation( + input=StartConversationInput( + bot_id=lex_bot, + bot_alias_id=BOT_ALIAS_ID, + locale_id=LOCALE_ID, + session_id=str(uuid.uuid4()), + conversation_mode="TEXT", + ) + ) + + results = await asyncio.gather(_send_events(stream), _receive_events(stream)) + + got_transcript, got_intent_result, got_text_response, got_heartbeat = results[1] + assert got_transcript, "Expected to receive a TranscriptEvent" + assert got_intent_result, "Expected to receive an IntentResultEvent" + assert got_text_response, "Expected to receive a TextResponseEvent" + assert got_heartbeat, "Expected to receive a HeartbeatEvent" diff --git a/clients/aws-sdk-lex-runtime-v2/tests/integration/test_non_streaming.py b/clients/aws-sdk-lex-runtime-v2/tests/integration/test_non_streaming.py new file mode 100644 index 0000000..d3abd05 --- /dev/null +++ b/clients/aws-sdk-lex-runtime-v2/tests/integration/test_non_streaming.py @@ -0,0 +1,48 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Test non-streaming output type handling.""" + +import uuid + +from aws_sdk_lex_runtime_v2.models import RecognizeTextInput, RecognizeTextOutput +from . import BOT_ALIAS_ID, LOCALE_ID, REGION, create_lex_client + + +async def test_recognize_text(lex_bot: str) -> None: + """Test non-streaming RecognizeText operation.""" + client = create_lex_client(REGION) + response = await client.recognize_text( + input=RecognizeTextInput( + bot_id=lex_bot, + bot_alias_id=BOT_ALIAS_ID, + locale_id=LOCALE_ID, + session_id=str(uuid.uuid4()), + text="Hello", + ) + ) + + assert isinstance(response, RecognizeTextOutput) + assert response.session_id is not None + + # Verify messages + assert response.messages is not None + assert len(response.messages) == 1 + msg = response.messages[0] + assert msg.content_type == "PlainText" + assert msg.content == "Hello! How can I help you?" + + # Verify session state + assert response.session_state is not None + assert response.session_state.intent is not None + assert response.session_state.intent.name == "Greeting" + assert response.session_state.intent.state == "Fulfilled" + + # Verify interpretations + assert response.interpretations is not None + assert len(response.interpretations) == 2 + interps_by_name = {i.intent.name: i for i in response.interpretations if i.intent} + assert "Greeting" in interps_by_name + assert "FallbackIntent" in interps_by_name + assert interps_by_name["Greeting"].nlu_confidence is not None + assert interps_by_name["Greeting"].nlu_confidence.score == 1.0